Add kafka jobs to DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index 867894f..3ef5ca3 100644 (file)
@@ -25,24 +25,25 @@ import (
        "sync"
        "time"
 
+       "github.com/confluentinc/confluent-kafka-go/kafka"
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 type TypeData struct {
-       TypeId        string `json:"id"`
-       DMaaPTopicURL string `json:"dmaapTopicUrl"`
-       jobsHandler   *jobsHandler
+       Identity    string `json:"id"`
+       jobsHandler *jobsHandler
 }
 
 type JobInfo struct {
-       Owner            string      `json:"owner"`
-       LastUpdated      string      `json:"last_updated"`
-       InfoJobIdentity  string      `json:"info_job_identity"`
-       TargetUri        string      `json:"target_uri"`
-       InfoJobData      interface{} `json:"info_job_data"`
-       InfoTypeIdentity string      `json:"info_type_identity"`
+       Owner            string     `json:"owner"`
+       LastUpdated      string     `json:"last_updated"`
+       InfoJobIdentity  string     `json:"info_job_identity"`
+       TargetUri        string     `json:"target_uri"`
+       InfoJobData      Parameters `json:"info_job_data"`
+       InfoTypeIdentity string     `json:"info_type_identity"`
 }
 
 type JobTypesManager interface {
@@ -59,14 +60,16 @@ type JobsManagerImpl struct {
        allTypes         map[string]TypeData
        pollClient       restclient.HTTPClient
        mrAddress        string
+       kafkaFactory     kafkaclient.KafkaFactory
        distributeClient restclient.HTTPClient
 }
 
-func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
                allTypes:         make(map[string]TypeData),
                pollClient:       pollClient,
                mrAddress:        mrAddr,
+               kafkaFactory:     kafkaFactory,
                distributeClient: distributeClient,
        }
 }
@@ -84,7 +87,7 @@ func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
 
 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
        for _, typeData := range jm.allTypes {
-               log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
+               log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
                typeData.jobsHandler.deleteJobCh <- jobId
        }
        log.Debug("Deleted job: ", jobId)
@@ -106,10 +109,12 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
 
 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
        for _, typeDef := range types {
-               jm.allTypes[typeDef.Id] = TypeData{
-                       TypeId:        typeDef.Id,
-                       DMaaPTopicURL: typeDef.DmaapTopicURL,
-                       jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
+               if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
+                       log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
+               }
+               jm.allTypes[typeDef.Identity] = TypeData{
+                       Identity:    typeDef.Identity,
+                       jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
                }
        }
        return types
@@ -126,7 +131,7 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string {
 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
        for _, jobType := range jm.allTypes {
 
-               go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
+               go jobType.jobsHandler.startPollingAndDistribution()
 
        }
 }
@@ -134,30 +139,29 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() {
 type jobsHandler struct {
        mu               sync.Mutex
        typeId           string
-       topicUrl         string
+       pollingAgent     pollingAgent
        jobs             map[string]job
        addJobCh         chan JobInfo
        deleteJobCh      chan string
-       pollClient       restclient.HTTPClient
        distributeClient restclient.HTTPClient
 }
 
-func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+       pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
        return &jobsHandler{
-               typeId:           typeId,
-               topicUrl:         topicURL,
+               typeId:           typeDef.Identity,
+               pollingAgent:     pollingAgent,
                jobs:             make(map[string]job),
                addJobCh:         make(chan JobInfo),
                deleteJobCh:      make(chan string),
-               pollClient:       pollClient,
                distributeClient: distributeClient,
        }
 }
 
-func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution() {
        go func() {
                for {
-                       jh.pollAndDistributeMessages(mRAddress)
+                       jh.pollAndDistributeMessages()
                }
        }()
 
@@ -168,19 +172,20 @@ func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
        }()
 }
 
-func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages() {
        log.Debugf("Processing jobs for type: %v", jh.typeId)
-       messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
+       messagesBody, error := jh.pollingAgent.pollMessages()
        if error != nil {
-               log.Warn("Error getting data from MR. Cause: ", error)
-               time.Sleep(time.Minute) // Must wait before trying to call MR again
+               log.Warn("Error getting data from source. Cause: ", error)
+               time.Sleep(time.Minute) // Must wait before trying to call data source again
+               return
        }
-       log.Debug("Received messages: ", string(messagesBody))
        jh.distributeMessages(messagesBody)
 }
 
 func (jh *jobsHandler) distributeMessages(messages []byte) {
-       if len(messages) > 2 {
+       if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+               log.Debug("Distributing messages: ", string(messages))
                jh.mu.Lock()
                defer jh.mu.Unlock()
                for _, job := range jh.jobs {
@@ -234,6 +239,61 @@ func (jh *jobsHandler) deleteJob(deletedJob string) {
        jh.mu.Unlock()
 }
 
+type pollingAgent interface {
+       pollMessages() ([]byte, error)
+}
+
+func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
+       if typeDef.DMaaPTopicURL != "" {
+               return dMaaPPollingAgent{
+                       messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
+                       pollClient:       pollClient,
+               }
+       } else {
+               return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
+       }
+}
+
+type dMaaPPollingAgent struct {
+       messageRouterURL string
+       pollClient       restclient.HTTPClient
+}
+
+func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
+       return restclient.Get(pa.messageRouterURL, pa.pollClient)
+}
+
+type kafkaPollingAgent struct {
+       kafkaClient kafkaclient.KafkaClient
+}
+
+func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
+       c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
+       if err != nil {
+               log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
+       }
+       return kafkaPollingAgent{
+               kafkaClient: c,
+       }
+}
+
+func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
+       msg, err := pa.kafkaClient.ReadMessage()
+       if err == nil {
+               return msg, nil
+       } else {
+               if isKafkaTimedOutError(err) {
+                       return []byte(""), nil
+               }
+               return nil, err
+       }
+}
+
+func isKafkaTimedOutError(err error) bool {
+       kafkaErr, ok := err.(kafka.Error)
+       return ok && kafkaErr.Code() == kafka.ErrTimedOut
+}
+
 type job struct {
        jobInfo         JobInfo
        client          restclient.HTTPClient
@@ -242,6 +302,7 @@ type job struct {
 }
 
 func newJob(j JobInfo, c restclient.HTTPClient) job {
+
        return job{
                jobInfo:         j,
                client:          c,
@@ -250,7 +311,24 @@ func newJob(j JobInfo, c restclient.HTTPClient) job {
        }
 }
 
+type Parameters struct {
+       BufferTimeout BufferTimeout `json:"bufferTimeout"`
+}
+
+type BufferTimeout struct {
+       MaxSize            int   `json:"maxSize"`
+       MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
+}
+
 func (j *job) start() {
+       if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
+               j.startReadingSingleMessages()
+       } else {
+               j.startReadingMessagesBuffered()
+       }
+}
+
+func (j *job) startReadingSingleMessages() {
 out:
        for {
                select {
@@ -263,10 +341,68 @@ out:
        }
 }
 
+func (j *job) startReadingMessagesBuffered() {
+out:
+       for {
+               select {
+               case <-j.controlChannel:
+                       log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+                       break out
+               default:
+                       msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
+                       if len(msgs) > 0 {
+                               j.sendMessagesToConsumer(msgs)
+                       }
+               }
+       }
+}
+
+func (j *job) read(bufferParams BufferTimeout) []byte {
+       wg := sync.WaitGroup{}
+       wg.Add(bufferParams.MaxSize)
+       var msgs []byte
+       c := make(chan struct{})
+       go func() {
+               i := 0
+       out:
+               for {
+                       select {
+                       case <-c:
+                               break out
+                       case msg := <-j.messagesChannel:
+                               i++
+                               msgs = append(msgs, msg...)
+                               wg.Done()
+                               if i == bufferParams.MaxSize {
+                                       break out
+                               }
+                       }
+               }
+       }()
+       j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
+       close(c)
+       return msgs
+}
+
+func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               return false // completed normally
+       case <-time.After(timeout):
+               return true // timed out
+       }
+}
+
 func (j *job) sendMessagesToConsumer(messages []byte) {
        log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
        if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+               return
        }
        log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }