Fix locking in DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index b856e29..1c42942 100644 (file)
@@ -182,8 +182,6 @@ func (jh *jobHandler) start(mRAddress string) {
 }
 
 func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
-       jh.mu.Lock()
-       defer jh.mu.Unlock()
        log.Debugf("Processing jobs for type: %v", jh.typeId)
        messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
        if error != nil {
@@ -195,6 +193,8 @@ func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
 
 func (jh *jobHandler) distributeMessages(messages []byte) {
        if len(messages) > 2 {
+               jh.mu.Lock()
+               defer jh.mu.Unlock()
                for _, jobInfo := range jh.jobs {
                        go jh.sendMessagesToConsumer(messages, jobInfo)
                }