Implement secure communications
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index 7b21b00..854372a 100644 (file)
@@ -154,11 +154,12 @@ func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) {
        defer jh.mu.Unlock()
        for typeId, typeInfo := range jh.allTypes {
                log.Debugf("Processing jobs for type: %v", typeId)
-               messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient)
+               messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient)
                if error != nil {
                        log.Warnf("Error getting data from MR. Cause: %v", error)
                        continue
                }
+               log.Debugf("Received messages: %v", string(messagesBody))
                jh.distributeMessages(messagesBody, typeInfo)
        }
 }
@@ -176,6 +177,7 @@ func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInf
        if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
        }
+       log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
 }
 
 func (jh *JobHandlerImpl) clearAll() {