+func (jm *JobsManagerImpl) StartJobsForAllTypes() {
+ for _, jobType := range jm.allTypes {
+
+ go jobType.jobsHandler.startPollingAndDistribution()
+
+ }
+}
+
+type jobsHandler struct {
+ mu sync.Mutex
+ typeId string
+ pollingAgent pollingAgent
+ jobs map[string]job
+ addJobCh chan JobInfo
+ deleteJobCh chan string
+ distributeClient restclient.HTTPClient
+}
+
+func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+ pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
+ return &jobsHandler{
+ typeId: typeDef.Identity,
+ pollingAgent: pollingAgent,
+ jobs: make(map[string]job),
+ addJobCh: make(chan JobInfo),
+ deleteJobCh: make(chan string),
+ distributeClient: distributeClient,
+ }
+}
+
+func (jh *jobsHandler) startPollingAndDistribution() {
+ go func() {
+ for {
+ jh.pollAndDistributeMessages()
+ }
+ }()
+
+ go func() {
+ for {
+ jh.monitorManagementChannels()
+ }
+ }()
+}
+
+func (jh *jobsHandler) pollAndDistributeMessages() {
+ log.Debugf("Processing jobs for type: %v", jh.typeId)
+ messagesBody, error := jh.pollingAgent.pollMessages()
+ if error != nil {
+ log.Warn("Error getting data from source. Cause: ", error)
+ time.Sleep(time.Minute) // Must wait before trying to call data source again
+ return
+ }
+ jh.distributeMessages(messagesBody)
+}
+
+func (jh *jobsHandler) distributeMessages(messages []byte) {
+ if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+ log.Debug("Distributing messages: ", string(messages))
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ for _, job := range jh.jobs {
+ if len(job.messagesChannel) < cap(job.messagesChannel) {
+ job.messagesChannel <- messages
+ } else {
+ jh.emptyMessagesBuffer(job)
+ }
+ }
+ }
+}
+
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+ log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out: