2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
33 "github.com/confluentinc/confluent-kafka-go/kafka"
34 "github.com/stretchr/testify/mock"
35 "github.com/stretchr/testify/require"
36 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
37 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
38 "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
41 func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
42 assertions := require.New(t)
44 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
46 wantedDMaaPType := config.TypeDefinition{
48 DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
50 wantedKafkaType := config.TypeDefinition{
52 KafkaInputTopic: "topic",
54 wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
56 types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
58 assertions.EqualValues(wantedTypes, types)
60 supportedTypes := managerUnderTest.GetSupportedTypes()
61 assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
64 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
65 assertions := require.New(t)
66 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
70 InfoJobIdentity: "job1",
72 InfoJobData: Parameters{},
73 InfoTypeIdentity: "type1",
75 jobsHandler := jobsHandler{
76 addJobCh: make(chan JobInfo)}
77 managerUnderTest.allTypes["type1"] = TypeData{
79 jobsHandler: &jobsHandler,
84 err = managerUnderTest.AddJobFromRESTCall(wantedJob)
88 addedJob := <-jobsHandler.addJobCh
89 assertions.Equal(wantedJob, addedJob)
92 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
93 assertions := require.New(t)
94 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
96 InfoTypeIdentity: "type1",
99 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
100 assertions.NotNil(err)
101 assertions.Equal("type not supported: type1", err.Error())
104 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
105 assertions := require.New(t)
106 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
107 managerUnderTest.allTypes["type1"] = TypeData{
112 InfoTypeIdentity: "type1",
114 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
115 assertions.NotNil(err)
116 assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error())
119 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
120 assertions := require.New(t)
121 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
122 managerUnderTest.allTypes["type1"] = TypeData{
127 InfoTypeIdentity: "type1",
128 InfoJobIdentity: "job1",
130 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
131 assertions.NotNil(err)
132 assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error())
135 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
136 assertions := require.New(t)
137 managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
138 jobsHandler := jobsHandler{
139 deleteJobCh: make(chan string)}
140 managerUnderTest.allTypes["type1"] = TypeData{
142 jobsHandler: &jobsHandler,
145 go managerUnderTest.DeleteJobFromRESTCall("job2")
147 assertions.Equal("job2", <-jobsHandler.deleteJobCh)
150 func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
151 assertions := require.New(t)
154 dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
155 pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
156 if req.URL.String() == "http://mrAddr/topicUrl" {
157 assertions.Equal(req.Method, "GET")
163 return &http.Response{
164 StatusCode: http.StatusOK,
165 Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
166 Header: make(http.Header), // Must be set to non-nil value or it panics
169 t.Error("Wrong call to client: ", req)
174 wg := sync.WaitGroup{}
175 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
176 if req.URL.String() == "http://consumerHost/dmaaptarget" {
177 assertions.Equal(req.Method, "POST")
178 assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
179 assertions.Equal("application/json", req.Header.Get("Content-Type"))
181 return &http.Response{
183 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
184 Header: make(http.Header), // Must be set to non-nil value or it panics
187 t.Error("Wrong call to client: ", req)
191 dMaaPTypeDef := config.TypeDefinition{
193 DMaaPTopicURL: "/topicUrl",
195 dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
197 jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
198 jobsManager.allTypes["type1"] = TypeData{
200 jobsHandler: dMaaPJobsHandler,
202 jobsManager.StartJobsForAllTypes()
204 dMaaPJobInfo := JobInfo{
205 InfoTypeIdentity: "type1",
206 InfoJobIdentity: "job1",
207 TargetUri: "http://consumerHost/dmaaptarget",
210 wg.Add(1) // Wait till the distribution has happened
211 err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
214 if waitTimeout(&wg, 2*time.Second) {
215 t.Error("Not all calls to server were made")
220 func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
221 assertions := require.New(t)
224 wg := sync.WaitGroup{}
225 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
226 if req.URL.String() == "http://consumerHost/kafkatarget" {
227 assertions.Equal(req.Method, "POST")
228 assertions.Equal(kafkaMessages, getBodyAsString(req, t))
229 assertions.Equal("application/json", req.Header.Get("Content-Type"))
231 return &http.Response{
233 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
234 Header: make(http.Header), // Must be set to non-nil value or it panics
237 t.Error("Wrong call to client: ", req)
242 kafkaTypeDef := config.TypeDefinition{
244 KafkaInputTopic: "topic",
246 kafkaFactoryMock := mocks.KafkaFactory{}
247 kafkaConsumerMock := mocks.KafkaConsumer{}
248 kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
249 kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
250 kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
251 Value: []byte(kafkaMessages),
252 }, error(nil)).Once()
253 kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
254 kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
255 kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
257 jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
258 jobsManager.allTypes["type2"] = TypeData{
260 jobsHandler: kafkaJobsHandler,
263 jobsManager.StartJobsForAllTypes()
265 kafkaJobInfo := JobInfo{
266 InfoTypeIdentity: "type2",
267 InfoJobIdentity: "job2",
268 TargetUri: "http://consumerHost/kafkatarget",
271 wg.Add(1) // Wait till the distribution has happened
272 err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
275 if waitTimeout(&wg, 2*time.Second) {
276 t.Error("Not all calls to server were made")
281 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
282 jobToDelete := newJob(JobInfo{}, nil)
283 go jobToDelete.start()
284 typeDef := config.TypeDefinition{
286 DMaaPTopicURL: "/topicUrl",
288 jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
289 jobsHandler.jobs["job1"] = jobToDelete
291 go jobsHandler.monitorManagementChannels()
293 jobsHandler.deleteJobCh <- "job1"
296 for i := 0; i < 100; i++ {
297 if len(jobsHandler.jobs) == 0 {
301 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
303 require.New(t).True(deleted, "Job not deleted")
306 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
307 job := newJob(JobInfo{
308 InfoJobIdentity: "job",
311 typeDef := config.TypeDefinition{
313 DMaaPTopicURL: "/topicUrl",
315 jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
316 jobsHandler.jobs["job1"] = job
318 fillMessagesBuffer(job.messagesChannel)
320 jobsHandler.distributeMessages([]byte("sent msg"))
322 require.New(t).Len(job.messagesChannel, 0)
325 func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
326 assertions := require.New(t)
328 kafkaFactoryMock := mocks.KafkaFactory{}
329 kafkaConsumerMock := mocks.KafkaConsumer{}
330 kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
331 kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
332 kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
333 kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
335 pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
336 messages, err := pollingAgentUnderTest.pollMessages()
338 assertions.Equal([]byte(""), messages)
342 func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
343 assertions := require.New(t)
345 wg := sync.WaitGroup{}
347 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
348 if req.URL.String() == "http://consumerHost/target" {
349 assertions.Equal(req.Method, "POST")
350 assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
352 assertions.Equal("application/json", req.Header.Get("Content-Type"))
354 return &http.Response{
356 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
357 Header: make(http.Header), // Must be set to non-nil value or it panics
360 t.Error("Wrong call to client: ", req)
365 jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
368 go jobUnderTest.start()
370 jobUnderTest.messagesChannel <- []byte("message1")
371 jobUnderTest.messagesChannel <- []byte("message2")
373 if waitTimeout(&wg, 2*time.Second) {
374 t.Error("Not all calls to server were made")
379 func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
380 assertions := require.New(t)
382 wg := sync.WaitGroup{}
383 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
384 if req.URL.String() == "http://consumerHost/target" {
385 assertions.Equal(req.Method, "POST")
386 assertions.Equal("12", getBodyAsString(req, t))
387 assertions.Equal("application/json", req.Header.Get("Content-Type"))
389 return &http.Response{
391 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
392 Header: make(http.Header), // Must be set to non-nil value or it panics
395 t.Error("Wrong call to client: ", req)
400 jobUnderTest := newJob(JobInfo{
401 TargetUri: "http://consumerHost/target",
402 InfoJobData: Parameters{
403 BufferTimeout: BufferTimeout{
405 MaxTimeMiliseconds: 200,
408 }, distributeClientMock)
411 go jobUnderTest.start()
414 jobUnderTest.messagesChannel <- []byte("1")
415 jobUnderTest.messagesChannel <- []byte("2")
418 if waitTimeout(&wg, 2*time.Second) {
419 t.Error("Not all calls to server were made")
424 func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
425 assertions := require.New(t)
427 jobUnderTest := newJob(JobInfo{}, nil)
430 for i := 0; i < 4; i++ {
431 jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
435 msgs := jobUnderTest.read(BufferTimeout{
437 MaxTimeMiliseconds: 200,
440 assertions.Equal([]byte("01"), msgs)
442 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
443 assertions := require.New(t)
445 jobUnderTest := newJob(JobInfo{}, nil)
448 for i := 0; i < 4; i++ {
449 time.Sleep(10 * time.Millisecond)
450 jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
454 msgs := jobUnderTest.read(BufferTimeout{
456 MaxTimeMiliseconds: 30,
459 assertions.Equal([]byte("01"), msgs)
462 func fillMessagesBuffer(mc chan []byte) {
463 for i := 0; i < cap(mc); i++ {
468 type RoundTripFunc func(req *http.Request) *http.Response
470 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
474 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
475 func NewTestClient(fn RoundTripFunc) *http.Client {
477 Transport: RoundTripFunc(fn),
481 // waitTimeout waits for the waitgroup for the specified max timeout.
482 // Returns true if waiting timed out.
483 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
484 c := make(chan struct{})
491 return false // completed normally
492 case <-time.After(timeout):
493 return true // timed out
497 func getBodyAsString(req *http.Request, t *testing.T) string {
498 buf := new(bytes.Buffer)
499 if _, err := buf.ReadFrom(req.Body); err != nil {