From: elinuxhenrik Date: Thu, 13 Jan 2022 06:40:57 +0000 (+0100) Subject: Stop sending empty messages to consumer X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=4a9589b4743667175a584e628e4fd0f97499482a;p=nonrtric.git Stop sending empty messages to consumer Issue-ID: NONRTRIC-702 Signed-off-by: elinuxhenrik Change-Id: I3fb3467165a159941ba121e37e4938b80d12e3a9 --- diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 0bf2f12a..c84e2773 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -399,15 +399,30 @@ func (j *job) read(bufferParams BufferTimeout) []byte { } func getAsJSONArray(rawMsgs [][]byte) []byte { - json := `"[` + if len(rawMsgs) == 0 { + return []byte("") + } + strings := "" for i := 0; i < len(rawMsgs); i++ { - msg := string(rawMsgs[i]) - json = json + strings.ReplaceAll(msg, "\"", "\\\"") - if i < len(rawMsgs)-1 { - json = json + "," - } + strings = strings + makeIntoString(rawMsgs[i]) + strings = addSeparatorIfNeeded(strings, i, len(rawMsgs)) } - return []byte(json + `]"`) + return []byte(wrapInJSONArray(strings)) +} + +func makeIntoString(rawMsg []byte) string { + return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"` +} + +func addSeparatorIfNeeded(strings string, position, length int) string { + if position < length-1 { + strings = strings + "," + } + return strings +} + +func wrapInJSONArray(strings string) string { + return "[" + strings + "]" } func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 6a1a70ad..e4f20e18 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -388,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") - assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t)) + assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t)) assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ @@ -418,6 +418,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { go func() { jobUnderTest.messagesChannel <- []byte(`{"data": 1}`) jobUnderTest.messagesChannel <- []byte(`{"data": 2}`) + jobUnderTest.messagesChannel <- []byte("ABCDEFGH") }() if waitTimeout(&wg, 2*time.Second) { @@ -442,7 +443,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t MaxTimeMiliseconds: 200, }) - assertions.Equal([]byte("\"[0,1]\""), msgs) + assertions.Equal([]byte("[\"0\",\"1\"]"), msgs) } func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) { assertions := require.New(t) @@ -461,7 +462,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t MaxTimeMiliseconds: 30, }) - assertions.Equal([]byte("\"[0,1]\""), msgs) + assertions.Equal([]byte("[\"0\",\"1\"]"), msgs) } func fillMessagesBuffer(mc chan []byte) {