X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;h=c84e2773e1c42447270a7fc03c9f3b980b9b2506;hb=4a9589b4743667175a584e628e4fd0f97499482a;hp=3ef5ca3e22a38dd622727f19e860dd4ea37b4768;hpb=6f5d3d1eccb8a1857c645ba6bd0b5e1b89ca7088;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 3ef5ca3e..c84e2773 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -22,6 +22,7 @@ package jobs import ( "fmt" + "strings" "sync" "time" @@ -37,6 +38,11 @@ type TypeData struct { jobsHandler *jobsHandler } +type sourceType string + +const dMaaPSource = sourceType("dmaap") +const kafkaSource = sourceType("kafka") + type JobInfo struct { Owner string `json:"owner"` LastUpdated string `json:"last_updated"` @@ -44,6 +50,7 @@ type JobInfo struct { TargetUri string `json:"target_uri"` InfoJobData Parameters `json:"info_job_data"` InfoTypeIdentity string `json:"info_type_identity"` + sourceType sourceType } type JobTypesManager interface { @@ -77,6 +84,7 @@ func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFa func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error { if err := jm.validateJobInfo(ji); err == nil { typeData := jm.allTypes[ji.InfoTypeIdentity] + ji.sourceType = typeData.jobsHandler.sourceType typeData.jobsHandler.addJobCh <- ji log.Debug("Added job: ", ji) return nil @@ -139,6 +147,7 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() { type jobsHandler struct { mu sync.Mutex typeId string + sourceType sourceType pollingAgent pollingAgent jobs map[string]job addJobCh chan JobInfo @@ -148,8 +157,13 @@ type jobsHandler struct { func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler { pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic) + sourceType := kafkaSource + if typeDef.DMaaPTopicURL != "" { + sourceType = dMaaPSource + } return &jobsHandler{ typeId: typeDef.Identity, + sourceType: sourceType, pollingAgent: pollingAgent, jobs: make(map[string]job), addJobCh: make(chan JobInfo), @@ -321,10 +335,10 @@ type BufferTimeout struct { } func (j *job) start() { - if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 { - j.startReadingSingleMessages() - } else { + if j.isJobBuffered() { j.startReadingMessagesBuffered() + } else { + j.startReadingSingleMessages() } } @@ -360,7 +374,7 @@ out: func (j *job) read(bufferParams BufferTimeout) []byte { wg := sync.WaitGroup{} wg.Add(bufferParams.MaxSize) - var msgs []byte + rawMsgs := make([][]byte, 0, bufferParams.MaxSize) c := make(chan struct{}) go func() { i := 0 @@ -370,8 +384,8 @@ func (j *job) read(bufferParams BufferTimeout) []byte { case <-c: break out case msg := <-j.messagesChannel: + rawMsgs = append(rawMsgs, msg) i++ - msgs = append(msgs, msg...) wg.Done() if i == bufferParams.MaxSize { break out @@ -381,7 +395,34 @@ func (j *job) read(bufferParams BufferTimeout) []byte { }() j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond) close(c) - return msgs + return getAsJSONArray(rawMsgs) +} + +func getAsJSONArray(rawMsgs [][]byte) []byte { + if len(rawMsgs) == 0 { + return []byte("") + } + strings := "" + for i := 0; i < len(rawMsgs); i++ { + strings = strings + makeIntoString(rawMsgs[i]) + strings = addSeparatorIfNeeded(strings, i, len(rawMsgs)) + } + return []byte(wrapInJSONArray(strings)) +} + +func makeIntoString(rawMsg []byte) string { + return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"` +} + +func addSeparatorIfNeeded(strings string, position, length int) string { + if position < length-1 { + strings = strings + "," + } + return strings +} + +func wrapInJSONArray(strings string) string { + return "[" + strings + "]" } func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { @@ -400,9 +441,21 @@ func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { func (j *job) sendMessagesToConsumer(messages []byte) { log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity) - if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil { + contentType := restclient.ContentTypeJSON + if j.isJobKafka() && !j.isJobBuffered() { + contentType = restclient.ContentTypePlain + } + if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr) return } log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner) } + +func (j *job) isJobBuffered() bool { + return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0 +} + +func (j *job) isJobKafka() bool { + return j.jobInfo.sourceType == kafkaSource +}