X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs_test.go;h=e4f20e18b702facfce57abdc772fa1d5f5dbe277;hb=4a9589b4743667175a584e628e4fd0f97499482a;hp=6f2922724d04f4b1d70ecf60d54d656b6d6c779a;hpb=fe61c6191ba72b1d8297264c3d61566ea23a70b6;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 6f292272..e4f20e18 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -22,172 +22,374 @@ package jobs import ( "bytes" + "fmt" "io/ioutil" "net/http" - "os" - "path/filepath" + "strconv" "sync" "testing" "time" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) -const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` - -func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { +func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) - typesDir, err := os.MkdirTemp("", "configs") - if err != nil { - t.Errorf("Unable to create temporary directory for types due to: %v", err) - } - t.Cleanup(func() { - os.RemoveAll(typesDir) - clearAll() - }) - fname := filepath.Join(typesDir, "type_config.json") - configFile = fname - if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { - t.Errorf("Unable to create temporary config file for types due to: %v", err) - } - types, err := GetTypes() - wantedType := TypeData{ - TypeId: "type1", + + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) + + wantedDMaaPType := config.TypeDefinition{ + Identity: "type1", DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", - Jobs: make(map[string]JobInfo), } - wantedTypes := []TypeData{wantedType} + wantedKafkaType := config.TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "topic", + } + wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType} + + types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes) + assertions.EqualValues(wantedTypes, types) - assertions.Nil(err) - supportedTypes := GetSupportedTypes() - assertions.EqualValues([]string{"type1"}, supportedTypes) + supportedTypes := managerUnderTest.GetSupportedTypes() + assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes) + assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType) + assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType) } -func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { +func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { assertions := require.New(t) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", InfoJobIdentity: "job1", TargetUri: "target", - InfoJobData: "{}", + InfoJobData: Parameters{}, InfoTypeIdentity: "type1", } - allTypes["type1"] = TypeData{ - TypeId: "type1", - Jobs: map[string]JobInfo{"job1": wantedJob}, + jobsHandler := jobsHandler{ + addJobCh: make(chan JobInfo)} + managerUnderTest.allTypes["type1"] = TypeData{ + Identity: "type1", + jobsHandler: &jobsHandler, } - t.Cleanup(func() { - clearAll() - }) - err := AddJob(wantedJob) + var err error + go func() { + err = managerUnderTest.AddJobFromRESTCall(wantedJob) + }() + assertions.Nil(err) - assertions.Equal(1, len(allTypes["type1"].Jobs)) - assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"]) + addedJob := <-jobsHandler.addJobCh + assertions.Equal(wantedJob, addedJob) } -func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { +func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - err := AddJob(jobInfo) + err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) assertions.Equal("type not supported: type1", err.Error()) } -func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { +func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allTypes["type1"] = TypeData{ - TypeId: "type1", + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) + managerUnderTest.allTypes["type1"] = TypeData{ + Identity: "type1", } - t.Cleanup(func() { - clearAll() - }) + jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - - err := AddJob(jobInfo) + err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required job identity: { type1}", err.Error()) + assertions.Equal("missing required job identity: { {{0 0}} type1 }", err.Error()) } -func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { +func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allTypes["type1"] = TypeData{ - TypeId: "type1", + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) + managerUnderTest.allTypes["type1"] = TypeData{ + Identity: "type1", } + jobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", } - - err := AddJob(jobInfo) + err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required target URI: { job1 type1}", err.Error()) - clearAll() + assertions.Equal("missing required target URI: { job1 {{0 0}} type1 }", err.Error()) } -func TestDeleteJob(t *testing.T) { +func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions := require.New(t) - jobToKeep := JobInfo{ - InfoJobIdentity: "job1", - InfoTypeIdentity: "type1", - } - jobToDelete := JobInfo{ - InfoJobIdentity: "job2", - InfoTypeIdentity: "type1", - } - allTypes["type1"] = TypeData{ - TypeId: "type1", - Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete}, + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) + jobsHandler := jobsHandler{ + deleteJobCh: make(chan string)} + managerUnderTest.allTypes["type1"] = TypeData{ + Identity: "type1", + jobsHandler: &jobsHandler, } - t.Cleanup(func() { - clearAll() - }) - DeleteJob("job2") - assertions.Equal(1, len(allTypes["type1"].Jobs)) - assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"]) + go managerUnderTest.DeleteJobFromRESTCall("job2") + + assertions.Equal("job2", <-jobsHandler.deleteJobCh) } -func TestPollAndDistributeMessages(t *testing.T) { +func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) - jobInfo := JobInfo{ + + called := false + dMaaPMessages := `[{"message": {"data": "dmaap"}}]` + pollClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://mrAddr/topicUrl" { + assertions.Equal(req.Method, "GET") + body := "[]" + if !called { + called = true + body = dMaaPMessages + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader([]byte(body))), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/dmaaptarget" { + assertions.Equal(req.Method, "POST") + assertions.Equal(dMaaPMessages, getBodyAsString(req, t)) + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + dMaaPTypeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock) + + jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock) + jobsManager.allTypes["type1"] = TypeData{ + Identity: "type1", + jobsHandler: dMaaPJobsHandler, + } + jobsManager.StartJobsForAllTypes() + + dMaaPJobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", - TargetUri: "http://consumerHost/target", + TargetUri: "http://consumerHost/dmaaptarget", } - allTypes["type1"] = TypeData{ - TypeId: "type1", - DMaaPTopicURL: "topicUrl", - Jobs: map[string]JobInfo{"job1": jobInfo}, + + wg.Add(1) // Wait till the distribution has happened + err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo) + assertions.Nil(err) + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() } - t.Cleanup(func() { - clearAll() +} + +func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) { + assertions := require.New(t) + + kafkaMessages := `1` + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/kafkatarget" { + assertions.Equal(req.Method, "POST") + assertions.Equal(kafkaMessages, getBodyAsString(req, t)) + assertions.Equal("text/plain", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil }) + kafkaTypeDef := config.TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "topic", + } + kafkaFactoryMock := mocks.KafkaFactory{} + kafkaConsumerMock := mocks.KafkaConsumer{} + kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil)) + kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil)) + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{ + Value: []byte(kafkaMessages), + }, error(nil)).Once() + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop")) + kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil) + kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock) + + jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock) + jobsManager.allTypes["type2"] = TypeData{ + Identity: "type2", + jobsHandler: kafkaJobsHandler, + } + + jobsManager.StartJobsForAllTypes() + + kafkaJobInfo := JobInfo{ + InfoTypeIdentity: "type2", + InfoJobIdentity: "job2", + TargetUri: "http://consumerHost/kafkatarget", + } + + wg.Add(1) // Wait till the distribution has happened + err := jobsManager.AddJobFromRESTCall(kafkaJobInfo) + assertions.Nil(err) + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) { + jobToDelete := newJob(JobInfo{}, nil) + go jobToDelete.start() + typeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil) + jobsHandler.jobs["job1"] = jobToDelete + + go jobsHandler.monitorManagementChannels() + + jobsHandler.deleteJobCh <- "job1" + + deleted := false + for i := 0; i < 100; i++ { + if len(jobsHandler.jobs) == 0 { + deleted = true + break + } + time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job + } + require.New(t).True(deleted, "Job not deleted") +} + +func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) { + job := newJob(JobInfo{ + InfoJobIdentity: "job", + }, nil) + + typeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil) + jobsHandler.jobs["job1"] = job + + fillMessagesBuffer(job.messagesChannel) + + jobsHandler.distributeMessages([]byte("sent msg")) + + require.New(t).Len(job.messagesChannel, 0) +} + +func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) { + assertions := require.New(t) + + kafkaFactoryMock := mocks.KafkaFactory{} + kafkaConsumerMock := mocks.KafkaConsumer{} + kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil)) + kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil)) + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false)) + kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil) + + pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "") + messages, err := pollingAgentUnderTest.pollMessages() + + assertions.Equal([]byte(""), messages) + assertions.Nil(err) +} + +func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { + assertions := require.New(t) + wg := sync.WaitGroup{} - wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute - messages := `[{"message": {"data": "data"}}]` - clientMock := NewTestClient(func(req *http.Request) *http.Response { - if req.URL.String() == "http://mrAddr/topicUrl" { - assertions.Equal(req.Method, "GET") + messageNo := 1 + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { + assertions.Equal(req.Method, "POST") + assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t)) + messageNo++ + assertions.Equal("text/plain", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))), + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), Header: make(http.Header), // Must be set to non-nil value or it panics } - } else if req.URL.String() == "http://consumerHost/target" { + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + jobUnderTest := newJob(JobInfo{ + sourceType: kafkaSource, + TargetUri: "http://consumerHost/target", + }, distributeClientMock) + + wg.Add(2) + go jobUnderTest.start() + + jobUnderTest.messagesChannel <- []byte("message1") + jobUnderTest.messagesChannel <- []byte("message2") + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { + assertions := require.New(t) + + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") - assertions.Equal(messages, getBodyAsString(req)) - assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type")) + assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t)) + assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ StatusCode: 200, @@ -200,16 +402,75 @@ func TestPollAndDistributeMessages(t *testing.T) { return nil }) - restclient.Client = clientMock + jobUnderTest := newJob(JobInfo{ + TargetUri: "http://consumerHost/target", + InfoJobData: Parameters{ + BufferTimeout: BufferTimeout{ + MaxSize: 5, + MaxTimeMiliseconds: 200, + }, + }, + }, distributeClientMock) - pollAndDistributeMessages("http://mrAddr") + wg.Add(1) + go jobUnderTest.start() - if waitTimeout(&wg, 100*time.Millisecond) { + go func() { + jobUnderTest.messagesChannel <- []byte(`{"data": 1}`) + jobUnderTest.messagesChannel <- []byte(`{"data": 2}`) + jobUnderTest.messagesChannel <- []byte("ABCDEFGH") + }() + + if waitTimeout(&wg, 2*time.Second) { t.Error("Not all calls to server were made") t.Fail() } } +func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) { + assertions := require.New(t) + + jobUnderTest := newJob(JobInfo{}, nil) + + go func() { + for i := 0; i < 4; i++ { + jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i)) + } + }() + + msgs := jobUnderTest.read(BufferTimeout{ + MaxSize: 2, + MaxTimeMiliseconds: 200, + }) + + assertions.Equal([]byte("[\"0\",\"1\"]"), msgs) +} +func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) { + assertions := require.New(t) + + jobUnderTest := newJob(JobInfo{}, nil) + + go func() { + for i := 0; i < 4; i++ { + time.Sleep(10 * time.Millisecond) + jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i)) + } + }() + + msgs := jobUnderTest.read(BufferTimeout{ + MaxSize: 2, + MaxTimeMiliseconds: 30, + }) + + assertions.Equal([]byte("[\"0\",\"1\"]"), msgs) +} + +func fillMessagesBuffer(mc chan []byte) { + for i := 0; i < cap(mc); i++ { + mc <- []byte("msg") + } +} + type RoundTripFunc func(req *http.Request) *http.Response func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { @@ -239,8 +500,10 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { } } -func getBodyAsString(req *http.Request) string { +func getBodyAsString(req *http.Request, t *testing.T) string { buf := new(bytes.Buffer) - buf.ReadFrom(req.Body) + if _, err := buf.ReadFrom(req.Body); err != nil { + t.Fail() + } return buf.String() }