From db7b5c801bdc96889aea18ab190f2b72fa8d8e06 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Wed, 12 Jan 2022 08:05:40 +0100 Subject: [PATCH] Fix distribution of Kafka messages Issue-ID: NONRTRIC-702 Signed-off-by: elinuxhenrik Change-Id: I4c2ed31b881f47648754409585d68425c0f8342f --- dmaap-mediator-producer/internal/jobs/jobs.go | 52 +++++++++++++++++++--- dmaap-mediator-producer/internal/jobs/jobs_test.go | 25 ++++++----- .../internal/restclient/HTTPClient.go | 13 +++--- .../internal/restclient/HTTPClient_test.go | 4 +- 4 files changed, 70 insertions(+), 24 deletions(-) diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 3ef5ca3e..0bf2f12a 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -22,6 +22,7 @@ package jobs import ( "fmt" + "strings" "sync" "time" @@ -37,6 +38,11 @@ type TypeData struct { jobsHandler *jobsHandler } +type sourceType string + +const dMaaPSource = sourceType("dmaap") +const kafkaSource = sourceType("kafka") + type JobInfo struct { Owner string `json:"owner"` LastUpdated string `json:"last_updated"` @@ -44,6 +50,7 @@ type JobInfo struct { TargetUri string `json:"target_uri"` InfoJobData Parameters `json:"info_job_data"` InfoTypeIdentity string `json:"info_type_identity"` + sourceType sourceType } type JobTypesManager interface { @@ -77,6 +84,7 @@ func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFa func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error { if err := jm.validateJobInfo(ji); err == nil { typeData := jm.allTypes[ji.InfoTypeIdentity] + ji.sourceType = typeData.jobsHandler.sourceType typeData.jobsHandler.addJobCh <- ji log.Debug("Added job: ", ji) return nil @@ -139,6 +147,7 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() { type jobsHandler struct { mu sync.Mutex typeId string + sourceType sourceType pollingAgent pollingAgent jobs map[string]job addJobCh chan JobInfo @@ -148,8 +157,13 @@ type jobsHandler struct { func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler { pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic) + sourceType := kafkaSource + if typeDef.DMaaPTopicURL != "" { + sourceType = dMaaPSource + } return &jobsHandler{ typeId: typeDef.Identity, + sourceType: sourceType, pollingAgent: pollingAgent, jobs: make(map[string]job), addJobCh: make(chan JobInfo), @@ -321,10 +335,10 @@ type BufferTimeout struct { } func (j *job) start() { - if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 { - j.startReadingSingleMessages() - } else { + if j.isJobBuffered() { j.startReadingMessagesBuffered() + } else { + j.startReadingSingleMessages() } } @@ -360,7 +374,7 @@ out: func (j *job) read(bufferParams BufferTimeout) []byte { wg := sync.WaitGroup{} wg.Add(bufferParams.MaxSize) - var msgs []byte + rawMsgs := make([][]byte, 0, bufferParams.MaxSize) c := make(chan struct{}) go func() { i := 0 @@ -370,8 +384,8 @@ func (j *job) read(bufferParams BufferTimeout) []byte { case <-c: break out case msg := <-j.messagesChannel: + rawMsgs = append(rawMsgs, msg) i++ - msgs = append(msgs, msg...) wg.Done() if i == bufferParams.MaxSize { break out @@ -381,7 +395,19 @@ func (j *job) read(bufferParams BufferTimeout) []byte { }() j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond) close(c) - return msgs + return getAsJSONArray(rawMsgs) +} + +func getAsJSONArray(rawMsgs [][]byte) []byte { + json := `"[` + for i := 0; i < len(rawMsgs); i++ { + msg := string(rawMsgs[i]) + json = json + strings.ReplaceAll(msg, "\"", "\\\"") + if i < len(rawMsgs)-1 { + json = json + "," + } + } + return []byte(json + `]"`) } func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { @@ -400,9 +426,21 @@ func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { func (j *job) sendMessagesToConsumer(messages []byte) { log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity) - if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil { + contentType := restclient.ContentTypeJSON + if j.isJobKafka() && !j.isJobBuffered() { + contentType = restclient.ContentTypePlain + } + if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr) return } log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner) } + +func (j *job) isJobBuffered() bool { + return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0 +} + +func (j *job) isJobKafka() bool { + return j.jobInfo.sourceType == kafkaSource +} diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index ab1165c9..6a1a70ad 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -59,6 +59,8 @@ func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t supportedTypes := managerUnderTest.GetSupportedTypes() assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes) + assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType) + assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType) } func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { @@ -113,7 +115,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error()) + assertions.Equal("missing required job identity: { {{0 0}} type1 }", err.Error()) } func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { @@ -129,7 +131,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error()) + assertions.Equal("missing required target URI: { job1 {{0 0}} type1 }", err.Error()) } func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { @@ -226,7 +228,7 @@ func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *tes if req.URL.String() == "http://consumerHost/kafkatarget" { assertions.Equal(req.Method, "POST") assertions.Equal(kafkaMessages, getBodyAsString(req, t)) - assertions.Equal("application/json", req.Header.Get("Content-Type")) + assertions.Equal("text/plain", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ StatusCode: 200, @@ -349,7 +351,7 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { assertions.Equal(req.Method, "POST") assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t)) messageNo++ - assertions.Equal("application/json", req.Header.Get("Content-Type")) + assertions.Equal("text/plain", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ StatusCode: 200, @@ -362,7 +364,10 @@ func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { return nil }) - jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock) + jobUnderTest := newJob(JobInfo{ + sourceType: kafkaSource, + TargetUri: "http://consumerHost/target", + }, distributeClientMock) wg.Add(2) go jobUnderTest.start() @@ -383,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") - assertions.Equal("12", getBodyAsString(req, t)) + assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t)) assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ @@ -411,8 +416,8 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { go jobUnderTest.start() go func() { - jobUnderTest.messagesChannel <- []byte("1") - jobUnderTest.messagesChannel <- []byte("2") + jobUnderTest.messagesChannel <- []byte(`{"data": 1}`) + jobUnderTest.messagesChannel <- []byte(`{"data": 2}`) }() if waitTimeout(&wg, 2*time.Second) { @@ -437,7 +442,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t MaxTimeMiliseconds: 200, }) - assertions.Equal([]byte("01"), msgs) + assertions.Equal([]byte("\"[0,1]\""), msgs) } func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) { assertions := require.New(t) @@ -456,7 +461,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t MaxTimeMiliseconds: 30, }) - assertions.Equal([]byte("01"), msgs) + assertions.Equal([]byte("\"[0,1]\""), msgs) } func fillMessagesBuffer(mc chan []byte) { diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index 9a827e7a..a7582c2b 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -34,6 +34,9 @@ import ( log "github.com/sirupsen/logrus" ) +const ContentTypeJSON = "application/json" +const ContentTypePlain = "text/plain" + // HTTPClient interface type HTTPClient interface { Get(url string) (*http.Response, error) @@ -68,16 +71,16 @@ func Get(url string, client HTTPClient) ([]byte, error) { } func Put(url string, body []byte, client HTTPClient) error { - return do(http.MethodPut, url, body, client) + return do(http.MethodPut, url, body, ContentTypeJSON, client) } -func Post(url string, body []byte, client HTTPClient) error { - return do(http.MethodPost, url, body, client) +func Post(url string, body []byte, contentType string, client HTTPClient) error { + return do(http.MethodPost, url, body, contentType, client) } -func do(method string, url string, body []byte, client HTTPClient) error { +func do(method string, url string, body []byte, contentType string, client HTTPClient) error { if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil { - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Type", contentType) if response, respErr := client.Do(req); respErr == nil { if isResponseSuccess(response.StatusCode) { return nil diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go index 20c26dda..90db6ae4 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go @@ -142,7 +142,7 @@ func TestPostOk(t *testing.T) { StatusCode: http.StatusOK, }, nil) - if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil { + if err := Post("http://localhost:9990", []byte("body"), "application/json", &clientMock); err != nil { t.Errorf("Put() error = %v, did not want error", err) } var actualRequest *http.Request @@ -202,7 +202,7 @@ func Test_doErrorCases(t *testing.T) { StatusCode: tt.args.mockReturnStatus, Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)), }, tt.args.mockReturnError) - err := do("PUT", tt.args.url, nil, &clientMock) + err := do("PUT", tt.args.url, nil, "", &clientMock) assertions.Equal(tt.wantErr, err, tt.name) }) } -- 2.16.6