X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;h=1c4294247cc9a3b883fd04e27e9bbde552b896a2;hb=6f48adb69090799c74c29204dd2cd1737cc9d6ac;hp=b856e2965cbdb3ae48b3ce48c44f225181289ebe;hpb=6e0d5846a1f6d938605a4afa7f392d97ac2ba8bc;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index b856e296..1c429424 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -182,8 +182,6 @@ func (jh *jobHandler) start(mRAddress string) { } func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) { - jh.mu.Lock() - defer jh.mu.Unlock() log.Debugf("Processing jobs for type: %v", jh.typeId) messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient) if error != nil { @@ -195,6 +193,8 @@ func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) { func (jh *jobHandler) distributeMessages(messages []byte) { if len(messages) > 2 { + jh.mu.Lock() + defer jh.mu.Unlock() for _, jobInfo := range jh.jobs { go jh.sendMessagesToConsumer(messages, jobInfo) }