Fix distribution of Kafka messages
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index 3ef5ca3..0bf2f12 100644 (file)
@@ -22,6 +22,7 @@ package jobs
 
 import (
        "fmt"
+       "strings"
        "sync"
        "time"
 
@@ -37,6 +38,11 @@ type TypeData struct {
        jobsHandler *jobsHandler
 }
 
+type sourceType string
+
+const dMaaPSource = sourceType("dmaap")
+const kafkaSource = sourceType("kafka")
+
 type JobInfo struct {
        Owner            string     `json:"owner"`
        LastUpdated      string     `json:"last_updated"`
@@ -44,6 +50,7 @@ type JobInfo struct {
        TargetUri        string     `json:"target_uri"`
        InfoJobData      Parameters `json:"info_job_data"`
        InfoTypeIdentity string     `json:"info_type_identity"`
+       sourceType       sourceType
 }
 
 type JobTypesManager interface {
@@ -77,6 +84,7 @@ func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFa
 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
        if err := jm.validateJobInfo(ji); err == nil {
                typeData := jm.allTypes[ji.InfoTypeIdentity]
+               ji.sourceType = typeData.jobsHandler.sourceType
                typeData.jobsHandler.addJobCh <- ji
                log.Debug("Added job: ", ji)
                return nil
@@ -139,6 +147,7 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() {
 type jobsHandler struct {
        mu               sync.Mutex
        typeId           string
+       sourceType       sourceType
        pollingAgent     pollingAgent
        jobs             map[string]job
        addJobCh         chan JobInfo
@@ -148,8 +157,13 @@ type jobsHandler struct {
 
 func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
        pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
+       sourceType := kafkaSource
+       if typeDef.DMaaPTopicURL != "" {
+               sourceType = dMaaPSource
+       }
        return &jobsHandler{
                typeId:           typeDef.Identity,
+               sourceType:       sourceType,
                pollingAgent:     pollingAgent,
                jobs:             make(map[string]job),
                addJobCh:         make(chan JobInfo),
@@ -321,10 +335,10 @@ type BufferTimeout struct {
 }
 
 func (j *job) start() {
-       if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
-               j.startReadingSingleMessages()
-       } else {
+       if j.isJobBuffered() {
                j.startReadingMessagesBuffered()
+       } else {
+               j.startReadingSingleMessages()
        }
 }
 
@@ -360,7 +374,7 @@ out:
 func (j *job) read(bufferParams BufferTimeout) []byte {
        wg := sync.WaitGroup{}
        wg.Add(bufferParams.MaxSize)
-       var msgs []byte
+       rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
        c := make(chan struct{})
        go func() {
                i := 0
@@ -370,8 +384,8 @@ func (j *job) read(bufferParams BufferTimeout) []byte {
                        case <-c:
                                break out
                        case msg := <-j.messagesChannel:
+                               rawMsgs = append(rawMsgs, msg)
                                i++
-                               msgs = append(msgs, msg...)
                                wg.Done()
                                if i == bufferParams.MaxSize {
                                        break out
@@ -381,7 +395,19 @@ func (j *job) read(bufferParams BufferTimeout) []byte {
        }()
        j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
        close(c)
-       return msgs
+       return getAsJSONArray(rawMsgs)
+}
+
+func getAsJSONArray(rawMsgs [][]byte) []byte {
+       json := `"[`
+       for i := 0; i < len(rawMsgs); i++ {
+               msg := string(rawMsgs[i])
+               json = json + strings.ReplaceAll(msg, "\"", "\\\"")
+               if i < len(rawMsgs)-1 {
+                       json = json + ","
+               }
+       }
+       return []byte(json + `]"`)
 }
 
 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
@@ -400,9 +426,21 @@ func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
 
 func (j *job) sendMessagesToConsumer(messages []byte) {
        log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
-       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+       contentType := restclient.ContentTypeJSON
+       if j.isJobKafka() && !j.isJobBuffered() {
+               contentType = restclient.ContentTypePlain
+       }
+       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
                return
        }
        log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }
+
+func (j *job) isJobBuffered() bool {
+       return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
+}
+
+func (j *job) isJobKafka() bool {
+       return j.jobInfo.sourceType == kafkaSource
+}