+func (jm *JobsManagerImpl) StartJobsForAllTypes() {
+ for _, jobType := range jm.allTypes {
+
+ go jobType.jobsHandler.startPollingAndDistribution()
+
+ }
+}
+
+type jobsHandler struct {
+ mu sync.Mutex
+ typeId string
+ sourceType sourceType
+ pollingAgent pollingAgent
+ jobs map[string]job
+ addJobCh chan JobInfo
+ deleteJobCh chan string
+ distributeClient restclient.HTTPClient
+}
+
+func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+ pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
+ sourceType := kafkaSource
+ if typeDef.DMaaPTopicURL != "" {
+ sourceType = dMaaPSource
+ }
+ return &jobsHandler{
+ typeId: typeDef.Identity,
+ sourceType: sourceType,
+ pollingAgent: pollingAgent,
+ jobs: make(map[string]job),
+ addJobCh: make(chan JobInfo),
+ deleteJobCh: make(chan string),
+ distributeClient: distributeClient,
+ }
+}
+
+func (jh *jobsHandler) startPollingAndDistribution() {
+ go func() {
+ for {
+ jh.pollAndDistributeMessages()
+ }
+ }()
+
+ go func() {
+ for {
+ jh.monitorManagementChannels()
+ }
+ }()
+}
+
+func (jh *jobsHandler) pollAndDistributeMessages() {
+ log.Debugf("Processing jobs for type: %v", jh.typeId)
+ messagesBody, error := jh.pollingAgent.pollMessages()
+ if error != nil {
+ log.Warn("Error getting data from source. Cause: ", error)
+ time.Sleep(time.Minute) // Must wait before trying to call data source again
+ return
+ }
+ jh.distributeMessages(messagesBody)
+}
+
+func (jh *jobsHandler) distributeMessages(messages []byte) {
+ if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+ log.Debug("Distributing messages: ", string(messages))
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ for _, job := range jh.jobs {
+ if len(job.messagesChannel) < cap(job.messagesChannel) {
+ job.messagesChannel <- messages
+ } else {
+ jh.emptyMessagesBuffer(job)
+ }
+ }
+ }
+}
+
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+ log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out:
+ for {
+ select {
+ case <-job.messagesChannel:
+ default:
+ break out
+ }
+ }
+}
+
+func (jh *jobsHandler) monitorManagementChannels() {
+ select {
+ case addedJob := <-jh.addJobCh:
+ jh.addJob(addedJob)
+ case deletedJob := <-jh.deleteJobCh:
+ jh.deleteJob(deletedJob)
+ }
+}
+
+func (jh *jobsHandler) addJob(addedJob JobInfo) {
+ jh.mu.Lock()
+ log.Debug("Add job: ", addedJob)
+ newJob := newJob(addedJob, jh.distributeClient)
+ go newJob.start()
+ jh.jobs[addedJob.InfoJobIdentity] = newJob
+ jh.mu.Unlock()
+}
+
+func (jh *jobsHandler) deleteJob(deletedJob string) {
+ jh.mu.Lock()
+ log.Debug("Delete job: ", deletedJob)
+ j, exist := jh.jobs[deletedJob]
+ if exist {
+ j.controlChannel <- struct{}{}
+ delete(jh.jobs, deletedJob)
+ }
+ jh.mu.Unlock()
+}
+
+type pollingAgent interface {
+ pollMessages() ([]byte, error)