X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs_test.go;h=ab1165c9ddc9b9867a5f6247fa49081b750adff6;hb=6f5d3d1eccb8a1857c645ba6bd0b5e1b89ca7088;hp=30b4ffd9f3aa867fc52e5e3b224a45e7fd84a357;hpb=6b43426c2b52daa5ace5205b98d09e1871fa41d6;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 30b4ffd9..ab1165c9 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -22,52 +22,60 @@ package jobs import ( "bytes" + "fmt" "io/ioutil" "net/http" + "strconv" "sync" "testing" "time" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) -const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` - -func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { +func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) - wantedType := config.TypeDefinition{ - Id: "type1", - DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + wantedDMaaPType := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + } + wantedKafkaType := config.TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "topic", } - wantedTypes := []config.TypeDefinition{wantedType} + wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType} types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes) assertions.EqualValues(wantedTypes, types) supportedTypes := managerUnderTest.GetSupportedTypes() - assertions.EqualValues([]string{"type1"}, supportedTypes) + assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes) } func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", InfoJobIdentity: "job1", TargetUri: "target", - InfoJobData: "{}", + InfoJobData: Parameters{}, InfoTypeIdentity: "type1", } jobsHandler := jobsHandler{ addJobCh: make(chan JobInfo)} managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", jobsHandler: &jobsHandler, } @@ -83,7 +91,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } @@ -95,9 +103,9 @@ func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", } jobInfo := JobInfo{ @@ -105,14 +113,14 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required job identity: { type1}", err.Error()) + assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error()) } func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", } jobInfo := JobInfo{ @@ -121,16 +129,16 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required target URI: { job1 type1}", err.Error()) + assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error()) } func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) jobsHandler := jobsHandler{ deleteJobCh: make(chan string)} managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", jobsHandler: &jobsHandler, } @@ -139,21 +147,21 @@ func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions.Equal("job2", <-jobsHandler.deleteJobCh) } -func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) { +func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) called := false - messages := `[{"message": {"data": "data"}}]` + dMaaPMessages := `[{"message": {"data": "dmaap"}}]` pollClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://mrAddr/topicUrl" { assertions.Equal(req.Method, "GET") body := "[]" if !called { called = true - body = messages + body = dMaaPMessages } return &http.Response{ - StatusCode: 200, + StatusCode: http.StatusOK, Body: ioutil.NopCloser(bytes.NewReader([]byte(body))), Header: make(http.Header), // Must be set to non-nil value or it panics } @@ -165,9 +173,9 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) wg := sync.WaitGroup{} distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { - if req.URL.String() == "http://consumerHost/target" { + if req.URL.String() == "http://consumerHost/dmaaptarget" { assertions.Equal(req.Method, "POST") - assertions.Equal(messages, getBodyAsString(req, t)) + assertions.Equal(dMaaPMessages, getBodyAsString(req, t)) assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ @@ -180,25 +188,88 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) t.Fail() return nil }) - jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock) - - jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock) - jobsManager.allTypes["type1"] = TypeData{ + dMaaPTypeDef := config.TypeDefinition{ + Identity: "type1", DMaaPTopicURL: "/topicUrl", - TypeId: "type1", - jobsHandler: jobsHandler, } + dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock) + jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock) + jobsManager.allTypes["type1"] = TypeData{ + Identity: "type1", + jobsHandler: dMaaPJobsHandler, + } jobsManager.StartJobsForAllTypes() - jobInfo := JobInfo{ + dMaaPJobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", - TargetUri: "http://consumerHost/target", + TargetUri: "http://consumerHost/dmaaptarget", } wg.Add(1) // Wait till the distribution has happened - err := jobsManager.AddJobFromRESTCall(jobInfo) + err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo) + assertions.Nil(err) + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) { + assertions := require.New(t) + + kafkaMessages := `1` + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/kafkatarget" { + assertions.Equal(req.Method, "POST") + assertions.Equal(kafkaMessages, getBodyAsString(req, t)) + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + kafkaTypeDef := config.TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "topic", + } + kafkaFactoryMock := mocks.KafkaFactory{} + kafkaConsumerMock := mocks.KafkaConsumer{} + kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil)) + kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil)) + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{ + Value: []byte(kafkaMessages), + }, error(nil)).Once() + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop")) + kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil) + kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock) + + jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock) + jobsManager.allTypes["type2"] = TypeData{ + Identity: "type2", + jobsHandler: kafkaJobsHandler, + } + + jobsManager.StartJobsForAllTypes() + + kafkaJobInfo := JobInfo{ + InfoTypeIdentity: "type2", + InfoJobIdentity: "job2", + TargetUri: "http://consumerHost/kafkatarget", + } + + wg.Add(1) // Wait till the distribution has happened + err := jobsManager.AddJobFromRESTCall(kafkaJobInfo) assertions.Nil(err) if waitTimeout(&wg, 2*time.Second) { @@ -210,7 +281,11 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) { jobToDelete := newJob(JobInfo{}, nil) go jobToDelete.start() - jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil) + typeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil) jobsHandler.jobs["job1"] = jobToDelete go jobsHandler.monitorManagementChannels() @@ -233,7 +308,11 @@ func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) { InfoJobIdentity: "job", }, nil) - jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil) + typeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil) jobsHandler.jobs["job1"] = job fillMessagesBuffer(job.messagesChannel) @@ -243,6 +322,143 @@ func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) { require.New(t).Len(job.messagesChannel, 0) } +func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) { + assertions := require.New(t) + + kafkaFactoryMock := mocks.KafkaFactory{} + kafkaConsumerMock := mocks.KafkaConsumer{} + kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil)) + kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil)) + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false)) + kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil) + + pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "") + messages, err := pollingAgentUnderTest.pollMessages() + + assertions.Equal([]byte(""), messages) + assertions.Nil(err) +} + +func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { + assertions := require.New(t) + + wg := sync.WaitGroup{} + messageNo := 1 + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { + assertions.Equal(req.Method, "POST") + assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t)) + messageNo++ + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock) + + wg.Add(2) + go jobUnderTest.start() + + jobUnderTest.messagesChannel <- []byte("message1") + jobUnderTest.messagesChannel <- []byte("message2") + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { + assertions := require.New(t) + + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { + assertions.Equal(req.Method, "POST") + assertions.Equal("12", getBodyAsString(req, t)) + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + jobUnderTest := newJob(JobInfo{ + TargetUri: "http://consumerHost/target", + InfoJobData: Parameters{ + BufferTimeout: BufferTimeout{ + MaxSize: 5, + MaxTimeMiliseconds: 200, + }, + }, + }, distributeClientMock) + + wg.Add(1) + go jobUnderTest.start() + + go func() { + jobUnderTest.messagesChannel <- []byte("1") + jobUnderTest.messagesChannel <- []byte("2") + }() + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) { + assertions := require.New(t) + + jobUnderTest := newJob(JobInfo{}, nil) + + go func() { + for i := 0; i < 4; i++ { + jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i)) + } + }() + + msgs := jobUnderTest.read(BufferTimeout{ + MaxSize: 2, + MaxTimeMiliseconds: 200, + }) + + assertions.Equal([]byte("01"), msgs) +} +func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) { + assertions := require.New(t) + + jobUnderTest := newJob(JobInfo{}, nil) + + go func() { + for i := 0; i < 4; i++ { + time.Sleep(10 * time.Millisecond) + jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i)) + } + }() + + msgs := jobUnderTest.read(BufferTimeout{ + MaxSize: 2, + MaxTimeMiliseconds: 30, + }) + + assertions.Equal([]byte("01"), msgs) +} + func fillMessagesBuffer(mc chan []byte) { for i := 0; i < cap(mc); i++ { mc <- []byte("msg")