X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs_test.go;h=7d021046277a6807ff1278ff8677ab3fd6f11159;hb=d2aeca8843fe3ffca2e73dec5b64daeef0dda938;hp=ab1165c9ddc9b9867a5f6247fa49081b750adff6;hpb=1b732d17463fad74721391b3a87a2a12172da63c;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index ab1165c9..7d021046 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -38,7 +38,7 @@ import ( "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) -func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { +func TestJobsManagerLoadTypesFromConfiguration_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) @@ -59,6 +59,8 @@ func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t supportedTypes := managerUnderTest.GetSupportedTypes() assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes) + assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType) + assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType) } func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { @@ -113,7 +115,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error()) + assertions.Equal("missing required job identity: { {{0 0}} type1 }", err.Error()) } func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { @@ -129,7 +131,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error()) + assertions.Equal("missing required target URI: { job1 {{0 0}} type1 }", err.Error()) } func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { @@ -226,7 +228,7 @@ func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *tes if req.URL.String() == "http://consumerHost/kafkatarget" { assertions.Equal(req.Method, "POST") assertions.Equal(kafkaMessages, getBodyAsString(req, t)) - assertions.Equal("application/json", req.Header.Get("Content-Type")) + assertions.Equal("text/plain", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ StatusCode: 200, @@ -349,7 +351,7 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { assertions.Equal(req.Method, "POST") assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t)) messageNo++ - assertions.Equal("application/json", req.Header.Get("Content-Type")) + assertions.Equal("text/plain", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ StatusCode: 200, @@ -362,7 +364,10 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { return nil }) - jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock) + jobUnderTest := newJob(JobInfo{ + sourceType: kafkaSource, + TargetUri: "http://consumerHost/target", + }, distributeClientMock) wg.Add(2) go jobUnderTest.start() @@ -383,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") - assertions.Equal("12", getBodyAsString(req, t)) + assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t)) assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ @@ -411,8 +416,9 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { go jobUnderTest.start() go func() { - jobUnderTest.messagesChannel <- []byte("1") - jobUnderTest.messagesChannel <- []byte("2") + jobUnderTest.messagesChannel <- []byte(`{"data": 1}`) + jobUnderTest.messagesChannel <- []byte(`{"data": 2}`) + jobUnderTest.messagesChannel <- []byte("ABCDEFGH") }() if waitTimeout(&wg, 2*time.Second) { @@ -437,7 +443,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t MaxTimeMiliseconds: 200, }) - assertions.Equal([]byte("01"), msgs) + assertions.Equal([]byte("[\"0\",\"1\"]"), msgs) } func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) { assertions := require.New(t) @@ -456,7 +462,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t MaxTimeMiliseconds: 30, }) - assertions.Equal([]byte("01"), msgs) + assertions.Equal([]byte("[\"0\",\"1\"]"), msgs) } func fillMessagesBuffer(mc chan []byte) {