defer jh.mu.Unlock()
for typeId, typeInfo := range jh.allTypes {
log.Debugf("Processing jobs for type: %v", typeId)
- messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient)
+ messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient)
if error != nil {
log.Warnf("Error getting data from MR. Cause: %v", error)
continue
}
+ log.Debugf("Received messages: %v", string(messagesBody))
jh.distributeMessages(messagesBody, typeInfo)
}
}
if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
}
+ log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
}
func (jh *JobHandlerImpl) clearAll() {