}
func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
- jh.mu.Lock()
- defer jh.mu.Unlock()
log.Debugf("Processing jobs for type: %v", jh.typeId)
messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
if error != nil {
func (jh *jobHandler) distributeMessages(messages []byte) {
if len(messages) > 2 {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
for _, jobInfo := range jh.jobs {
go jh.sendMessagesToConsumer(messages, jobInfo)
}