X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;h=1c4294247cc9a3b883fd04e27e9bbde552b896a2;hb=d289079b1b15286e994bc29b98cd732fc32fb06f;hp=b856e2965cbdb3ae48b3ce48c44f225181289ebe;hpb=3923a2bd9989a6988cdb29d464c86cb32ced5439;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index b856e296..1c429424 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -182,8 +182,6 @@ func (jh *jobHandler) start(mRAddress string) { } func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) { - jh.mu.Lock() - defer jh.mu.Unlock() log.Debugf("Processing jobs for type: %v", jh.typeId) messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient) if error != nil { @@ -195,6 +193,8 @@ func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) { func (jh *jobHandler) distributeMessages(messages []byte) { if len(messages) > 2 { + jh.mu.Lock() + defer jh.mu.Unlock() for _, jobInfo := range jh.jobs { go jh.sendMessagesToConsumer(messages, jobInfo) }