Fix distribution of Kafka messages 48/7548/2
authorelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 12 Jan 2022 07:05:40 +0000 (08:05 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 12 Jan 2022 09:39:02 +0000 (10:39 +0100)
Issue-ID: NONRTRIC-702
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I4c2ed31b881f47648754409585d68425c0f8342f

dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/restclient/HTTPClient.go
dmaap-mediator-producer/internal/restclient/HTTPClient_test.go

index 3ef5ca3..0bf2f12 100644 (file)
@@ -22,6 +22,7 @@ package jobs
 
 import (
        "fmt"
+       "strings"
        "sync"
        "time"
 
@@ -37,6 +38,11 @@ type TypeData struct {
        jobsHandler *jobsHandler
 }
 
+type sourceType string
+
+const dMaaPSource = sourceType("dmaap")
+const kafkaSource = sourceType("kafka")
+
 type JobInfo struct {
        Owner            string     `json:"owner"`
        LastUpdated      string     `json:"last_updated"`
@@ -44,6 +50,7 @@ type JobInfo struct {
        TargetUri        string     `json:"target_uri"`
        InfoJobData      Parameters `json:"info_job_data"`
        InfoTypeIdentity string     `json:"info_type_identity"`
+       sourceType       sourceType
 }
 
 type JobTypesManager interface {
@@ -77,6 +84,7 @@ func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFa
 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
        if err := jm.validateJobInfo(ji); err == nil {
                typeData := jm.allTypes[ji.InfoTypeIdentity]
+               ji.sourceType = typeData.jobsHandler.sourceType
                typeData.jobsHandler.addJobCh <- ji
                log.Debug("Added job: ", ji)
                return nil
@@ -139,6 +147,7 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() {
 type jobsHandler struct {
        mu               sync.Mutex
        typeId           string
+       sourceType       sourceType
        pollingAgent     pollingAgent
        jobs             map[string]job
        addJobCh         chan JobInfo
@@ -148,8 +157,13 @@ type jobsHandler struct {
 
 func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
        pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
+       sourceType := kafkaSource
+       if typeDef.DMaaPTopicURL != "" {
+               sourceType = dMaaPSource
+       }
        return &jobsHandler{
                typeId:           typeDef.Identity,
+               sourceType:       sourceType,
                pollingAgent:     pollingAgent,
                jobs:             make(map[string]job),
                addJobCh:         make(chan JobInfo),
@@ -321,10 +335,10 @@ type BufferTimeout struct {
 }
 
 func (j *job) start() {
-       if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
-               j.startReadingSingleMessages()
-       } else {
+       if j.isJobBuffered() {
                j.startReadingMessagesBuffered()
+       } else {
+               j.startReadingSingleMessages()
        }
 }
 
@@ -360,7 +374,7 @@ out:
 func (j *job) read(bufferParams BufferTimeout) []byte {
        wg := sync.WaitGroup{}
        wg.Add(bufferParams.MaxSize)
-       var msgs []byte
+       rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
        c := make(chan struct{})
        go func() {
                i := 0
@@ -370,8 +384,8 @@ func (j *job) read(bufferParams BufferTimeout) []byte {
                        case <-c:
                                break out
                        case msg := <-j.messagesChannel:
+                               rawMsgs = append(rawMsgs, msg)
                                i++
-                               msgs = append(msgs, msg...)
                                wg.Done()
                                if i == bufferParams.MaxSize {
                                        break out
@@ -381,7 +395,19 @@ func (j *job) read(bufferParams BufferTimeout) []byte {
        }()
        j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
        close(c)
-       return msgs
+       return getAsJSONArray(rawMsgs)
+}
+
+func getAsJSONArray(rawMsgs [][]byte) []byte {
+       json := `"[`
+       for i := 0; i < len(rawMsgs); i++ {
+               msg := string(rawMsgs[i])
+               json = json + strings.ReplaceAll(msg, "\"", "\\\"")
+               if i < len(rawMsgs)-1 {
+                       json = json + ","
+               }
+       }
+       return []byte(json + `]"`)
 }
 
 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
@@ -400,9 +426,21 @@ func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
 
 func (j *job) sendMessagesToConsumer(messages []byte) {
        log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
-       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+       contentType := restclient.ContentTypeJSON
+       if j.isJobKafka() && !j.isJobBuffered() {
+               contentType = restclient.ContentTypePlain
+       }
+       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
                return
        }
        log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }
+
+func (j *job) isJobBuffered() bool {
+       return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
+}
+
+func (j *job) isJobKafka() bool {
+       return j.jobInfo.sourceType == kafkaSource
+}
index ab1165c..6a1a70a 100644 (file)
@@ -59,6 +59,8 @@ func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t
 
        supportedTypes := managerUnderTest.GetSupportedTypes()
        assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
+       assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
+       assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
 }
 
 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
@@ -113,7 +115,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        }
        err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
-       assertions.Equal("missing required job identity: {    {{0 0}} type1}", err.Error())
+       assertions.Equal("missing required job identity: {    {{0 0}} type1 }", err.Error())
 }
 
 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
@@ -129,7 +131,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        }
        err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
-       assertions.Equal("missing required target URI: {  job1  {{0 0}} type1}", err.Error())
+       assertions.Equal("missing required target URI: {  job1  {{0 0}} type1 }", err.Error())
 }
 
 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
@@ -226,7 +228,7 @@ func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *tes
                if req.URL.String() == "http://consumerHost/kafkatarget" {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(kafkaMessages, getBodyAsString(req, t))
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       assertions.Equal("text/plain", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
                                StatusCode: 200,
@@ -349,7 +351,7 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
                        messageNo++
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       assertions.Equal("text/plain", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
                                StatusCode: 200,
@@ -362,7 +364,10 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
                return nil
        })
 
-       jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+       jobUnderTest := newJob(JobInfo{
+               sourceType: kafkaSource,
+               TargetUri:  "http://consumerHost/target",
+       }, distributeClientMock)
 
        wg.Add(2)
        go jobUnderTest.start()
@@ -383,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal("12", getBodyAsString(req, t))
+                       assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
@@ -411,8 +416,8 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        go jobUnderTest.start()
 
        go func() {
-               jobUnderTest.messagesChannel <- []byte("1")
-               jobUnderTest.messagesChannel <- []byte("2")
+               jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
+               jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
        }()
 
        if waitTimeout(&wg, 2*time.Second) {
@@ -437,7 +442,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t
                MaxTimeMiliseconds: 200,
        })
 
-       assertions.Equal([]byte("01"), msgs)
+       assertions.Equal([]byte("\"[0,1]\""), msgs)
 }
 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
        assertions := require.New(t)
@@ -456,7 +461,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t
                MaxTimeMiliseconds: 30,
        })
 
-       assertions.Equal([]byte("01"), msgs)
+       assertions.Equal([]byte("\"[0,1]\""), msgs)
 }
 
 func fillMessagesBuffer(mc chan []byte) {
index 9a827e7..a7582c2 100644 (file)
@@ -34,6 +34,9 @@ import (
        log "github.com/sirupsen/logrus"
 )
 
+const ContentTypeJSON = "application/json"
+const ContentTypePlain = "text/plain"
+
 // HTTPClient interface
 type HTTPClient interface {
        Get(url string) (*http.Response, error)
@@ -68,16 +71,16 @@ func Get(url string, client HTTPClient) ([]byte, error) {
 }
 
 func Put(url string, body []byte, client HTTPClient) error {
-       return do(http.MethodPut, url, body, client)
+       return do(http.MethodPut, url, body, ContentTypeJSON, client)
 }
 
-func Post(url string, body []byte, client HTTPClient) error {
-       return do(http.MethodPost, url, body, client)
+func Post(url string, body []byte, contentType string, client HTTPClient) error {
+       return do(http.MethodPost, url, body, contentType, client)
 }
 
-func do(method string, url string, body []byte, client HTTPClient) error {
+func do(method string, url string, body []byte, contentType string, client HTTPClient) error {
        if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
-               req.Header.Set("Content-Type", "application/json")
+               req.Header.Set("Content-Type", contentType)
                if response, respErr := client.Do(req); respErr == nil {
                        if isResponseSuccess(response.StatusCode) {
                                return nil
index 20c26dd..90db6ae 100644 (file)
@@ -142,7 +142,7 @@ func TestPostOk(t *testing.T) {
                StatusCode: http.StatusOK,
        }, nil)
 
-       if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil {
+       if err := Post("http://localhost:9990", []byte("body"), "application/json", &clientMock); err != nil {
                t.Errorf("Put() error = %v, did not want error", err)
        }
        var actualRequest *http.Request
@@ -202,7 +202,7 @@ func Test_doErrorCases(t *testing.T) {
                                StatusCode: tt.args.mockReturnStatus,
                                Body:       ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
                        }, tt.args.mockReturnError)
-                       err := do("PUT", tt.args.url, nil, &clientMock)
+                       err := do("PUT", tt.args.url, nil, "", &clientMock)
                        assertions.Equal(tt.wantErr, err, tt.name)
                })
        }