import (
"fmt"
+ "strings"
"sync"
"time"
jobsHandler *jobsHandler
}
+type sourceType string
+
+const dMaaPSource = sourceType("dmaap")
+const kafkaSource = sourceType("kafka")
+
type JobInfo struct {
Owner string `json:"owner"`
LastUpdated string `json:"last_updated"`
TargetUri string `json:"target_uri"`
InfoJobData Parameters `json:"info_job_data"`
InfoTypeIdentity string `json:"info_type_identity"`
-}
+ sourceType sourceType
+} // @name JobInfo
type JobTypesManager interface {
LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
if err := jm.validateJobInfo(ji); err == nil {
typeData := jm.allTypes[ji.InfoTypeIdentity]
+ ji.sourceType = typeData.jobsHandler.sourceType
typeData.jobsHandler.addJobCh <- ji
log.Debug("Added job: ", ji)
return nil
type jobsHandler struct {
mu sync.Mutex
typeId string
+ sourceType sourceType
pollingAgent pollingAgent
jobs map[string]job
addJobCh chan JobInfo
func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
+ sourceType := kafkaSource
+ if typeDef.DMaaPTopicURL != "" {
+ sourceType = dMaaPSource
+ }
return &jobsHandler{
typeId: typeDef.Identity,
+ sourceType: sourceType,
pollingAgent: pollingAgent,
jobs: make(map[string]job),
addJobCh: make(chan JobInfo),
type Parameters struct {
BufferTimeout BufferTimeout `json:"bufferTimeout"`
-}
+} // @name Parameters
type BufferTimeout struct {
MaxSize int `json:"maxSize"`
MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
-}
+} // @name BufferTimeout
func (j *job) start() {
- if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
- j.startReadingSingleMessages()
- } else {
+ if j.isJobBuffered() {
j.startReadingMessagesBuffered()
+ } else {
+ j.startReadingSingleMessages()
}
}
func (j *job) read(bufferParams BufferTimeout) []byte {
wg := sync.WaitGroup{}
wg.Add(bufferParams.MaxSize)
- var msgs []byte
+ rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
c := make(chan struct{})
go func() {
i := 0
case <-c:
break out
case msg := <-j.messagesChannel:
+ rawMsgs = append(rawMsgs, msg)
i++
- msgs = append(msgs, msg...)
wg.Done()
if i == bufferParams.MaxSize {
break out
}()
j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
close(c)
- return msgs
+ return getAsJSONArray(rawMsgs)
+}
+
+func getAsJSONArray(rawMsgs [][]byte) []byte {
+ if len(rawMsgs) == 0 {
+ return []byte("")
+ }
+ strings := ""
+ for i := 0; i < len(rawMsgs); i++ {
+ strings = strings + makeIntoString(rawMsgs[i])
+ strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
+ }
+ return []byte(wrapInJSONArray(strings))
+}
+
+func makeIntoString(rawMsg []byte) string {
+ return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
+}
+
+func addSeparatorIfNeeded(strings string, position, length int) string {
+ if position < length-1 {
+ strings = strings + ","
+ }
+ return strings
+}
+
+func wrapInJSONArray(strings string) string {
+ return "[" + strings + "]"
}
func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
func (j *job) sendMessagesToConsumer(messages []byte) {
log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
- if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+ contentType := restclient.ContentTypeJSON
+ if j.isJobKafka() && !j.isJobBuffered() {
+ contentType = restclient.ContentTypePlain
+ }
+ if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
return
}
log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
}
+
+func (j *job) isJobBuffered() bool {
+ return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
+}
+
+func (j *job) isJobKafka() bool {
+ return j.jobInfo.sourceType == kafkaSource
+}