+ }()
+}
+
+func (jh *jobsHandler) pollAndDistributeMessages() {
+ log.Debugf("Processing jobs for type: %v", jh.typeId)
+ messagesBody, error := jh.pollingAgent.pollMessages()
+ if error != nil {
+ log.Warn("Error getting data from source. Cause: ", error)
+ time.Sleep(time.Minute) // Must wait before trying to call data source again
+ return
+ }
+ jh.distributeMessages(messagesBody)
+}
+
+func (jh *jobsHandler) distributeMessages(messages []byte) {
+ if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+ log.Debug("Distributing messages: ", string(messages))
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ for _, job := range jh.jobs {
+ if len(job.messagesChannel) < cap(job.messagesChannel) {
+ job.messagesChannel <- messages
+ } else {
+ jh.emptyMessagesBuffer(job)
+ }
+ }
+ }
+}
+
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+ log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out:
+ for {
+ select {
+ case <-job.messagesChannel:
+ default:
+ break out
+ }
+ }
+}
+
+func (jh *jobsHandler) monitorManagementChannels() {
+ select {
+ case addedJob := <-jh.addJobCh:
+ jh.addJob(addedJob)
+ case deletedJob := <-jh.deleteJobCh:
+ jh.deleteJob(deletedJob)
+ }
+}
+
+func (jh *jobsHandler) addJob(addedJob JobInfo) {
+ jh.mu.Lock()
+ log.Debug("Add job: ", addedJob)
+ newJob := newJob(addedJob, jh.distributeClient)
+ go newJob.start()
+ jh.jobs[addedJob.InfoJobIdentity] = newJob
+ jh.mu.Unlock()
+}
+
+func (jh *jobsHandler) deleteJob(deletedJob string) {
+ jh.mu.Lock()
+ log.Debug("Delete job: ", deletedJob)
+ j, exist := jh.jobs[deletedJob]
+ if exist {
+ j.controlChannel <- struct{}{}
+ delete(jh.jobs, deletedJob)
+ }
+ jh.mu.Unlock()
+}
+
+type pollingAgent interface {
+ pollMessages() ([]byte, error)
+}
+
+func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
+ if typeDef.DMaaPTopicURL != "" {
+ return dMaaPPollingAgent{
+ messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
+ pollClient: pollClient,
+ }
+ } else {
+ return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
+ }
+}
+
+type dMaaPPollingAgent struct {
+ messageRouterURL string
+ pollClient restclient.HTTPClient
+}
+
+func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
+ return restclient.Get(pa.messageRouterURL, pa.pollClient)
+}
+
+type kafkaPollingAgent struct {
+ kafkaClient kafkaclient.KafkaClient
+}
+
+func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
+ c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
+ if err != nil {
+ log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
+ }
+ return kafkaPollingAgent{
+ kafkaClient: c,
+ }
+}
+
+func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
+ msg, err := pa.kafkaClient.ReadMessage()
+ if err == nil {
+ return msg, nil