+ }
+}
+
+func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+ log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
+ if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
+ log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+ }
+ log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
+}
+
+func (jh *jobHandler) monitorManagementChannels() {
+ select {
+ case addedJob := <-jh.addJobCh:
+ jh.mu.Lock()
+ log.Debugf("received %v from addJobCh\n", addedJob)
+ jh.jobs[addedJob.InfoJobIdentity] = addedJob
+ jh.mu.Unlock()
+ case deletedJob := <-jh.deleteJobCh:
+ jh.mu.Lock()
+ log.Debugf("received %v from deleteJobCh\n", deletedJob)
+ delete(jh.jobs, deletedJob)
+ jh.mu.Unlock()