ab1165c9ddc9b9867a5f6247fa49081b750adff6
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "fmt"
26         "io/ioutil"
27         "net/http"
28         "strconv"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/confluentinc/confluent-kafka-go/kafka"
34         "github.com/stretchr/testify/mock"
35         "github.com/stretchr/testify/require"
36         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
37         "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
38         "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
39 )
40
41 func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
42         assertions := require.New(t)
43
44         managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
45
46         wantedDMaaPType := config.TypeDefinition{
47                 Identity:      "type1",
48                 DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
49         }
50         wantedKafkaType := config.TypeDefinition{
51                 Identity:        "type2",
52                 KafkaInputTopic: "topic",
53         }
54         wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
55
56         types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
57
58         assertions.EqualValues(wantedTypes, types)
59
60         supportedTypes := managerUnderTest.GetSupportedTypes()
61         assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
62 }
63
64 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
65         assertions := require.New(t)
66         managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
67         wantedJob := JobInfo{
68                 Owner:            "owner",
69                 LastUpdated:      "now",
70                 InfoJobIdentity:  "job1",
71                 TargetUri:        "target",
72                 InfoJobData:      Parameters{},
73                 InfoTypeIdentity: "type1",
74         }
75         jobsHandler := jobsHandler{
76                 addJobCh: make(chan JobInfo)}
77         managerUnderTest.allTypes["type1"] = TypeData{
78                 Identity:    "type1",
79                 jobsHandler: &jobsHandler,
80         }
81
82         var err error
83         go func() {
84                 err = managerUnderTest.AddJobFromRESTCall(wantedJob)
85         }()
86
87         assertions.Nil(err)
88         addedJob := <-jobsHandler.addJobCh
89         assertions.Equal(wantedJob, addedJob)
90 }
91
92 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
93         assertions := require.New(t)
94         managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
95         jobInfo := JobInfo{
96                 InfoTypeIdentity: "type1",
97         }
98
99         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
100         assertions.NotNil(err)
101         assertions.Equal("type not supported: type1", err.Error())
102 }
103
104 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
105         assertions := require.New(t)
106         managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
107         managerUnderTest.allTypes["type1"] = TypeData{
108                 Identity: "type1",
109         }
110
111         jobInfo := JobInfo{
112                 InfoTypeIdentity: "type1",
113         }
114         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
115         assertions.NotNil(err)
116         assertions.Equal("missing required job identity: {    {{0 0}} type1}", err.Error())
117 }
118
119 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
120         assertions := require.New(t)
121         managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
122         managerUnderTest.allTypes["type1"] = TypeData{
123                 Identity: "type1",
124         }
125
126         jobInfo := JobInfo{
127                 InfoTypeIdentity: "type1",
128                 InfoJobIdentity:  "job1",
129         }
130         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
131         assertions.NotNil(err)
132         assertions.Equal("missing required target URI: {  job1  {{0 0}} type1}", err.Error())
133 }
134
135 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
136         assertions := require.New(t)
137         managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
138         jobsHandler := jobsHandler{
139                 deleteJobCh: make(chan string)}
140         managerUnderTest.allTypes["type1"] = TypeData{
141                 Identity:    "type1",
142                 jobsHandler: &jobsHandler,
143         }
144
145         go managerUnderTest.DeleteJobFromRESTCall("job2")
146
147         assertions.Equal("job2", <-jobsHandler.deleteJobCh)
148 }
149
150 func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
151         assertions := require.New(t)
152
153         called := false
154         dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
155         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
156                 if req.URL.String() == "http://mrAddr/topicUrl" {
157                         assertions.Equal(req.Method, "GET")
158                         body := "[]"
159                         if !called {
160                                 called = true
161                                 body = dMaaPMessages
162                         }
163                         return &http.Response{
164                                 StatusCode: http.StatusOK,
165                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
166                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
167                         }
168                 }
169                 t.Error("Wrong call to client: ", req)
170                 t.Fail()
171                 return nil
172         })
173
174         wg := sync.WaitGroup{}
175         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
176                 if req.URL.String() == "http://consumerHost/dmaaptarget" {
177                         assertions.Equal(req.Method, "POST")
178                         assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
179                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
180                         wg.Done()
181                         return &http.Response{
182                                 StatusCode: 200,
183                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
184                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
185                         }
186                 }
187                 t.Error("Wrong call to client: ", req)
188                 t.Fail()
189                 return nil
190         })
191         dMaaPTypeDef := config.TypeDefinition{
192                 Identity:      "type1",
193                 DMaaPTopicURL: "/topicUrl",
194         }
195         dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
196
197         jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
198         jobsManager.allTypes["type1"] = TypeData{
199                 Identity:    "type1",
200                 jobsHandler: dMaaPJobsHandler,
201         }
202         jobsManager.StartJobsForAllTypes()
203
204         dMaaPJobInfo := JobInfo{
205                 InfoTypeIdentity: "type1",
206                 InfoJobIdentity:  "job1",
207                 TargetUri:        "http://consumerHost/dmaaptarget",
208         }
209
210         wg.Add(1) // Wait till the distribution has happened
211         err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
212         assertions.Nil(err)
213
214         if waitTimeout(&wg, 2*time.Second) {
215                 t.Error("Not all calls to server were made")
216                 t.Fail()
217         }
218 }
219
220 func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
221         assertions := require.New(t)
222
223         kafkaMessages := `1`
224         wg := sync.WaitGroup{}
225         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
226                 if req.URL.String() == "http://consumerHost/kafkatarget" {
227                         assertions.Equal(req.Method, "POST")
228                         assertions.Equal(kafkaMessages, getBodyAsString(req, t))
229                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
230                         wg.Done()
231                         return &http.Response{
232                                 StatusCode: 200,
233                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
234                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
235                         }
236                 }
237                 t.Error("Wrong call to client: ", req)
238                 t.Fail()
239                 return nil
240         })
241
242         kafkaTypeDef := config.TypeDefinition{
243                 Identity:        "type2",
244                 KafkaInputTopic: "topic",
245         }
246         kafkaFactoryMock := mocks.KafkaFactory{}
247         kafkaConsumerMock := mocks.KafkaConsumer{}
248         kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
249         kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
250         kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
251                 Value: []byte(kafkaMessages),
252         }, error(nil)).Once()
253         kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
254         kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
255         kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
256
257         jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
258         jobsManager.allTypes["type2"] = TypeData{
259                 Identity:    "type2",
260                 jobsHandler: kafkaJobsHandler,
261         }
262
263         jobsManager.StartJobsForAllTypes()
264
265         kafkaJobInfo := JobInfo{
266                 InfoTypeIdentity: "type2",
267                 InfoJobIdentity:  "job2",
268                 TargetUri:        "http://consumerHost/kafkatarget",
269         }
270
271         wg.Add(1) // Wait till the distribution has happened
272         err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
273         assertions.Nil(err)
274
275         if waitTimeout(&wg, 2*time.Second) {
276                 t.Error("Not all calls to server were made")
277                 t.Fail()
278         }
279 }
280
281 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
282         jobToDelete := newJob(JobInfo{}, nil)
283         go jobToDelete.start()
284         typeDef := config.TypeDefinition{
285                 Identity:      "type1",
286                 DMaaPTopicURL: "/topicUrl",
287         }
288         jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
289         jobsHandler.jobs["job1"] = jobToDelete
290
291         go jobsHandler.monitorManagementChannels()
292
293         jobsHandler.deleteJobCh <- "job1"
294
295         deleted := false
296         for i := 0; i < 100; i++ {
297                 if len(jobsHandler.jobs) == 0 {
298                         deleted = true
299                         break
300                 }
301                 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
302         }
303         require.New(t).True(deleted, "Job not deleted")
304 }
305
306 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
307         job := newJob(JobInfo{
308                 InfoJobIdentity: "job",
309         }, nil)
310
311         typeDef := config.TypeDefinition{
312                 Identity:      "type1",
313                 DMaaPTopicURL: "/topicUrl",
314         }
315         jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
316         jobsHandler.jobs["job1"] = job
317
318         fillMessagesBuffer(job.messagesChannel)
319
320         jobsHandler.distributeMessages([]byte("sent msg"))
321
322         require.New(t).Len(job.messagesChannel, 0)
323 }
324
325 func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
326         assertions := require.New(t)
327
328         kafkaFactoryMock := mocks.KafkaFactory{}
329         kafkaConsumerMock := mocks.KafkaConsumer{}
330         kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
331         kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
332         kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
333         kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
334
335         pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
336         messages, err := pollingAgentUnderTest.pollMessages()
337
338         assertions.Equal([]byte(""), messages)
339         assertions.Nil(err)
340 }
341
342 func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
343         assertions := require.New(t)
344
345         wg := sync.WaitGroup{}
346         messageNo := 1
347         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
348                 if req.URL.String() == "http://consumerHost/target" {
349                         assertions.Equal(req.Method, "POST")
350                         assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
351                         messageNo++
352                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
353                         wg.Done()
354                         return &http.Response{
355                                 StatusCode: 200,
356                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
357                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
358                         }
359                 }
360                 t.Error("Wrong call to client: ", req)
361                 t.Fail()
362                 return nil
363         })
364
365         jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
366
367         wg.Add(2)
368         go jobUnderTest.start()
369
370         jobUnderTest.messagesChannel <- []byte("message1")
371         jobUnderTest.messagesChannel <- []byte("message2")
372
373         if waitTimeout(&wg, 2*time.Second) {
374                 t.Error("Not all calls to server were made")
375                 t.Fail()
376         }
377 }
378
379 func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
380         assertions := require.New(t)
381
382         wg := sync.WaitGroup{}
383         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
384                 if req.URL.String() == "http://consumerHost/target" {
385                         assertions.Equal(req.Method, "POST")
386                         assertions.Equal("12", getBodyAsString(req, t))
387                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
388                         wg.Done()
389                         return &http.Response{
390                                 StatusCode: 200,
391                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
392                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
393                         }
394                 }
395                 t.Error("Wrong call to client: ", req)
396                 t.Fail()
397                 return nil
398         })
399
400         jobUnderTest := newJob(JobInfo{
401                 TargetUri: "http://consumerHost/target",
402                 InfoJobData: Parameters{
403                         BufferTimeout: BufferTimeout{
404                                 MaxSize:            5,
405                                 MaxTimeMiliseconds: 200,
406                         },
407                 },
408         }, distributeClientMock)
409
410         wg.Add(1)
411         go jobUnderTest.start()
412
413         go func() {
414                 jobUnderTest.messagesChannel <- []byte("1")
415                 jobUnderTest.messagesChannel <- []byte("2")
416         }()
417
418         if waitTimeout(&wg, 2*time.Second) {
419                 t.Error("Not all calls to server were made")
420                 t.Fail()
421         }
422 }
423
424 func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
425         assertions := require.New(t)
426
427         jobUnderTest := newJob(JobInfo{}, nil)
428
429         go func() {
430                 for i := 0; i < 4; i++ {
431                         jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
432                 }
433         }()
434
435         msgs := jobUnderTest.read(BufferTimeout{
436                 MaxSize:            2,
437                 MaxTimeMiliseconds: 200,
438         })
439
440         assertions.Equal([]byte("01"), msgs)
441 }
442 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
443         assertions := require.New(t)
444
445         jobUnderTest := newJob(JobInfo{}, nil)
446
447         go func() {
448                 for i := 0; i < 4; i++ {
449                         time.Sleep(10 * time.Millisecond)
450                         jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
451                 }
452         }()
453
454         msgs := jobUnderTest.read(BufferTimeout{
455                 MaxSize:            2,
456                 MaxTimeMiliseconds: 30,
457         })
458
459         assertions.Equal([]byte("01"), msgs)
460 }
461
462 func fillMessagesBuffer(mc chan []byte) {
463         for i := 0; i < cap(mc); i++ {
464                 mc <- []byte("msg")
465         }
466 }
467
468 type RoundTripFunc func(req *http.Request) *http.Response
469
470 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
471         return f(req), nil
472 }
473
474 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
475 func NewTestClient(fn RoundTripFunc) *http.Client {
476         return &http.Client{
477                 Transport: RoundTripFunc(fn),
478         }
479 }
480
481 // waitTimeout waits for the waitgroup for the specified max timeout.
482 // Returns true if waiting timed out.
483 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
484         c := make(chan struct{})
485         go func() {
486                 defer close(c)
487                 wg.Wait()
488         }()
489         select {
490         case <-c:
491                 return false // completed normally
492         case <-time.After(timeout):
493                 return true // timed out
494         }
495 }
496
497 func getBodyAsString(req *http.Request, t *testing.T) string {
498         buf := new(bytes.Buffer)
499         if _, err := buf.ReadFrom(req.Body); err != nil {
500                 t.Fail()
501         }
502         return buf.String()
503 }