Stop sending empty messages to consumer 55/7555/4
authorelinuxhenrik <henrik.b.andersson@est.tech>
Thu, 13 Jan 2022 06:40:57 +0000 (07:40 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 14 Jan 2022 09:30:14 +0000 (10:30 +0100)
Issue-ID: NONRTRIC-702
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I3fb3467165a159941ba121e37e4938b80d12e3a9

dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go

index 0bf2f12..c84e277 100644 (file)
@@ -399,15 +399,30 @@ func (j *job) read(bufferParams BufferTimeout) []byte {
 }
 
 func getAsJSONArray(rawMsgs [][]byte) []byte {
-       json := `"[`
+       if len(rawMsgs) == 0 {
+               return []byte("")
+       }
+       strings := ""
        for i := 0; i < len(rawMsgs); i++ {
-               msg := string(rawMsgs[i])
-               json = json + strings.ReplaceAll(msg, "\"", "\\\"")
-               if i < len(rawMsgs)-1 {
-                       json = json + ","
-               }
+               strings = strings + makeIntoString(rawMsgs[i])
+               strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
        }
-       return []byte(json + `]"`)
+       return []byte(wrapInJSONArray(strings))
+}
+
+func makeIntoString(rawMsg []byte) string {
+       return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
+}
+
+func addSeparatorIfNeeded(strings string, position, length int) string {
+       if position < length-1 {
+               strings = strings + ","
+       }
+       return strings
+}
+
+func wrapInJSONArray(strings string) string {
+       return "[" + strings + "]"
 }
 
 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
index 6a1a70a..e4f20e1 100644 (file)
@@ -388,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t))
+                       assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
@@ -418,6 +418,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        go func() {
                jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
                jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
+               jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
        }()
 
        if waitTimeout(&wg, 2*time.Second) {
@@ -442,7 +443,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t
                MaxTimeMiliseconds: 200,
        })
 
-       assertions.Equal([]byte("\"[0,1]\""), msgs)
+       assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
 }
 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
        assertions := require.New(t)
@@ -461,7 +462,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t
                MaxTimeMiliseconds: 30,
        })
 
-       assertions.Equal([]byte("\"[0,1]\""), msgs)
+       assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
 }
 
 func fillMessagesBuffer(mc chan []byte) {