+
+func RunJobs(mRAddress string) {
+ for {
+ pollAndDistributeMessages(mRAddress)
+ }
+}
+
+func pollAndDistributeMessages(mRAddress string) {
+ for typeId, typeInfo := range allJobs {
+ log.Debugf("Processing jobs for type: %v", typeId)
+ messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+ if error != nil {
+ log.Warnf("Error getting data from MR. Cause: %v", error)
+ continue
+ }
+ distributeMessages(messagesBody, typeInfo)
+ }
+}
+
+func distributeMessages(messages []byte, typeInfo Type) {
+ if len(messages) > 2 {
+ mu.Lock()
+ for _, jobInfo := range typeInfo.Jobs {
+ go sendMessagesToConsumer(messages, jobInfo)
+ }
+ mu.Unlock()
+ }
+}
+
+func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+ log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
+ if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+ log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+ }
+}
+
+func clearAll() {
+ allJobs = make(map[string]Type)
+}