2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
33 "github.com/confluentinc/confluent-kafka-go/kafka"
34 "github.com/stretchr/testify/mock"
35 "github.com/stretchr/testify/require"
36 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
37 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
38 "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
41 func TestJobsManagerLoadTypesFromConfiguration_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
42 assertions := require.New(t)
44 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
46 wantedDMaaPType := config.TypeDefinition{
48 DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
50 wantedKafkaType := config.TypeDefinition{
52 KafkaInputTopic: "topic",
54 wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
56 types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
58 assertions.EqualValues(wantedTypes, types)
60 supportedTypes := managerUnderTest.GetSupportedTypes()
61 assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
62 assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
63 assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
66 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
67 assertions := require.New(t)
68 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
72 InfoJobIdentity: "job1",
74 InfoJobData: Parameters{},
75 InfoTypeIdentity: "type1",
77 jobsHandler := jobsHandler{
78 addJobCh: make(chan JobInfo)}
79 managerUnderTest.allTypes["type1"] = TypeData{
81 jobsHandler: &jobsHandler,
86 err = managerUnderTest.AddJobFromRESTCall(wantedJob)
90 addedJob := <-jobsHandler.addJobCh
91 assertions.Equal(wantedJob, addedJob)
94 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
95 assertions := require.New(t)
96 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
98 InfoTypeIdentity: "type1",
101 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
102 assertions.NotNil(err)
103 assertions.Equal("type not supported: type1", err.Error())
106 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
107 assertions := require.New(t)
108 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
109 managerUnderTest.allTypes["type1"] = TypeData{
114 InfoTypeIdentity: "type1",
116 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
117 assertions.NotNil(err)
118 assertions.Equal("missing required job identity: { {{0 0}} type1 }", err.Error())
121 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
122 assertions := require.New(t)
123 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
124 managerUnderTest.allTypes["type1"] = TypeData{
129 InfoTypeIdentity: "type1",
130 InfoJobIdentity: "job1",
132 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
133 assertions.NotNil(err)
134 assertions.Equal("missing required target URI: { job1 {{0 0}} type1 }", err.Error())
137 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
138 assertions := require.New(t)
139 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
140 jobsHandler := jobsHandler{
141 deleteJobCh: make(chan string)}
142 managerUnderTest.allTypes["type1"] = TypeData{
144 jobsHandler: &jobsHandler,
147 go managerUnderTest.DeleteJobFromRESTCall("job2")
149 assertions.Equal("job2", <-jobsHandler.deleteJobCh)
152 func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
153 assertions := require.New(t)
156 dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
157 pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
158 if req.URL.String() == "http://mrAddr/topicUrl" {
159 assertions.Equal(req.Method, "GET")
165 return &http.Response{
166 StatusCode: http.StatusOK,
167 Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
168 Header: make(http.Header), // Must be set to non-nil value or it panics
171 t.Error("Wrong call to client: ", req)
176 wg := sync.WaitGroup{}
177 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
178 if req.URL.String() == "http://consumerHost/dmaaptarget" {
179 assertions.Equal(req.Method, "POST")
180 assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
181 assertions.Equal("application/json", req.Header.Get("Content-Type"))
183 return &http.Response{
185 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
186 Header: make(http.Header), // Must be set to non-nil value or it panics
189 t.Error("Wrong call to client: ", req)
193 dMaaPTypeDef := config.TypeDefinition{
195 DMaaPTopicURL: "/topicUrl",
197 dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
199 jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
200 jobsManager.allTypes["type1"] = TypeData{
202 jobsHandler: dMaaPJobsHandler,
204 jobsManager.StartJobsForAllTypes()
206 dMaaPJobInfo := JobInfo{
207 InfoTypeIdentity: "type1",
208 InfoJobIdentity: "job1",
209 TargetUri: "http://consumerHost/dmaaptarget",
212 wg.Add(1) // Wait till the distribution has happened
213 err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
216 if waitTimeout(&wg, 2*time.Second) {
217 t.Error("Not all calls to server were made")
222 func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
223 assertions := require.New(t)
226 wg := sync.WaitGroup{}
227 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
228 if req.URL.String() == "http://consumerHost/kafkatarget" {
229 assertions.Equal(req.Method, "POST")
230 assertions.Equal(kafkaMessages, getBodyAsString(req, t))
231 assertions.Equal("text/plain", req.Header.Get("Content-Type"))
233 return &http.Response{
235 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
236 Header: make(http.Header), // Must be set to non-nil value or it panics
239 t.Error("Wrong call to client: ", req)
244 kafkaTypeDef := config.TypeDefinition{
246 KafkaInputTopic: "topic",
248 kafkaFactoryMock := mocks.KafkaFactory{}
249 kafkaConsumerMock := mocks.KafkaConsumer{}
250 kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
251 kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
252 kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
253 Value: []byte(kafkaMessages),
254 }, error(nil)).Once()
255 kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
256 kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
257 kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
259 jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
260 jobsManager.allTypes["type2"] = TypeData{
262 jobsHandler: kafkaJobsHandler,
265 jobsManager.StartJobsForAllTypes()
267 kafkaJobInfo := JobInfo{
268 InfoTypeIdentity: "type2",
269 InfoJobIdentity: "job2",
270 TargetUri: "http://consumerHost/kafkatarget",
273 wg.Add(1) // Wait till the distribution has happened
274 err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
277 if waitTimeout(&wg, 2*time.Second) {
278 t.Error("Not all calls to server were made")
283 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
284 jobToDelete := newJob(JobInfo{}, nil)
285 go jobToDelete.start()
286 typeDef := config.TypeDefinition{
288 DMaaPTopicURL: "/topicUrl",
290 jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
291 jobsHandler.jobs["job1"] = jobToDelete
293 go jobsHandler.monitorManagementChannels()
295 jobsHandler.deleteJobCh <- "job1"
298 for i := 0; i < 100; i++ {
299 if len(jobsHandler.jobs) == 0 {
303 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
305 require.New(t).True(deleted, "Job not deleted")
308 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
309 job := newJob(JobInfo{
310 InfoJobIdentity: "job",
313 typeDef := config.TypeDefinition{
315 DMaaPTopicURL: "/topicUrl",
317 jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
318 jobsHandler.jobs["job1"] = job
320 fillMessagesBuffer(job.messagesChannel)
322 jobsHandler.distributeMessages([]byte("sent msg"))
324 require.New(t).Len(job.messagesChannel, 0)
327 func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
328 assertions := require.New(t)
330 kafkaFactoryMock := mocks.KafkaFactory{}
331 kafkaConsumerMock := mocks.KafkaConsumer{}
332 kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
333 kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
334 kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
335 kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
337 pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
338 messages, err := pollingAgentUnderTest.pollMessages()
340 assertions.Equal([]byte(""), messages)
344 func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
345 assertions := require.New(t)
347 wg := sync.WaitGroup{}
349 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
350 if req.URL.String() == "http://consumerHost/target" {
351 assertions.Equal(req.Method, "POST")
352 assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
354 assertions.Equal("text/plain", req.Header.Get("Content-Type"))
356 return &http.Response{
358 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
359 Header: make(http.Header), // Must be set to non-nil value or it panics
362 t.Error("Wrong call to client: ", req)
367 jobUnderTest := newJob(JobInfo{
368 sourceType: kafkaSource,
369 TargetUri: "http://consumerHost/target",
370 }, distributeClientMock)
373 go jobUnderTest.start()
375 jobUnderTest.messagesChannel <- []byte("message1")
376 jobUnderTest.messagesChannel <- []byte("message2")
378 if waitTimeout(&wg, 2*time.Second) {
379 t.Error("Not all calls to server were made")
384 func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
385 assertions := require.New(t)
387 wg := sync.WaitGroup{}
388 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
389 if req.URL.String() == "http://consumerHost/target" {
390 assertions.Equal(req.Method, "POST")
391 assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t))
392 assertions.Equal("application/json", req.Header.Get("Content-Type"))
394 return &http.Response{
396 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
397 Header: make(http.Header), // Must be set to non-nil value or it panics
400 t.Error("Wrong call to client: ", req)
405 jobUnderTest := newJob(JobInfo{
406 TargetUri: "http://consumerHost/target",
407 InfoJobData: Parameters{
408 BufferTimeout: BufferTimeout{
410 MaxTimeMiliseconds: 200,
413 }, distributeClientMock)
416 go jobUnderTest.start()
419 jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
420 jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
421 jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
424 if waitTimeout(&wg, 2*time.Second) {
425 t.Error("Not all calls to server were made")
430 func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
431 assertions := require.New(t)
433 jobUnderTest := newJob(JobInfo{}, nil)
436 for i := 0; i < 4; i++ {
437 jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
441 msgs := jobUnderTest.read(BufferTimeout{
443 MaxTimeMiliseconds: 200,
446 assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
448 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
449 assertions := require.New(t)
451 jobUnderTest := newJob(JobInfo{}, nil)
454 for i := 0; i < 4; i++ {
455 time.Sleep(10 * time.Millisecond)
456 jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
460 msgs := jobUnderTest.read(BufferTimeout{
462 MaxTimeMiliseconds: 30,
465 assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
468 func fillMessagesBuffer(mc chan []byte) {
469 for i := 0; i < cap(mc); i++ {
474 type RoundTripFunc func(req *http.Request) *http.Response
476 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
480 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
481 func NewTestClient(fn RoundTripFunc) *http.Client {
483 Transport: RoundTripFunc(fn),
487 // waitTimeout waits for the waitgroup for the specified max timeout.
488 // Returns true if waiting timed out.
489 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
490 c := make(chan struct{})
497 return false // completed normally
498 case <-time.After(timeout):
499 return true // timed out
503 func getBodyAsString(req *http.Request, t *testing.T) string {
504 buf := new(bytes.Buffer)
505 if _, err := buf.ReadFrom(req.Body); err != nil {