Fix distribution of Kafka messages
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
index ab1165c..6a1a70a 100644 (file)
@@ -59,6 +59,8 @@ func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t
 
        supportedTypes := managerUnderTest.GetSupportedTypes()
        assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
+       assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
+       assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
 }
 
 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
@@ -113,7 +115,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        }
        err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
-       assertions.Equal("missing required job identity: {    {{0 0}} type1}", err.Error())
+       assertions.Equal("missing required job identity: {    {{0 0}} type1 }", err.Error())
 }
 
 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
@@ -129,7 +131,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        }
        err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
-       assertions.Equal("missing required target URI: {  job1  {{0 0}} type1}", err.Error())
+       assertions.Equal("missing required target URI: {  job1  {{0 0}} type1 }", err.Error())
 }
 
 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
@@ -226,7 +228,7 @@ func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *tes
                if req.URL.String() == "http://consumerHost/kafkatarget" {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(kafkaMessages, getBodyAsString(req, t))
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       assertions.Equal("text/plain", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
                                StatusCode: 200,
@@ -349,7 +351,7 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
                        messageNo++
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       assertions.Equal("text/plain", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
                                StatusCode: 200,
@@ -362,7 +364,10 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
                return nil
        })
 
-       jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+       jobUnderTest := newJob(JobInfo{
+               sourceType: kafkaSource,
+               TargetUri:  "http://consumerHost/target",
+       }, distributeClientMock)
 
        wg.Add(2)
        go jobUnderTest.start()
@@ -383,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal("12", getBodyAsString(req, t))
+                       assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
@@ -411,8 +416,8 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        go jobUnderTest.start()
 
        go func() {
-               jobUnderTest.messagesChannel <- []byte("1")
-               jobUnderTest.messagesChannel <- []byte("2")
+               jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
+               jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
        }()
 
        if waitTimeout(&wg, 2*time.Second) {
@@ -437,7 +442,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t
                MaxTimeMiliseconds: 200,
        })
 
-       assertions.Equal([]byte("01"), msgs)
+       assertions.Equal([]byte("\"[0,1]\""), msgs)
 }
 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
        assertions := require.New(t)
@@ -456,7 +461,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t
                MaxTimeMiliseconds: 30,
        })
 
-       assertions.Equal([]byte("01"), msgs)
+       assertions.Equal([]byte("\"[0,1]\""), msgs)
 }
 
 func fillMessagesBuffer(mc chan []byte) {