Improve concurrency of message sending 30/7030/7
authorelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 10 Nov 2021 09:33:32 +0000 (10:33 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 16 Nov 2021 07:08:30 +0000 (08:08 +0100)
Issue-ID: NONRTRIC-635
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I414740a918eb65abae94c91990ff39a3cfc7bd22

dmaap-mediator-producer/Dockerfile
dmaap-mediator-producer/README.md
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go

index bc09fdc..1c7f45c 100644 (file)
@@ -20,7 +20,7 @@
 ##
 ## Build
 ##
 ##
 ## Build
 ##
-FROM golang:1.17-bullseye AS build
+FROM nexus3.o-ran-sc.org:10001/golang:1.17-bullseye AS build
 WORKDIR /app
 COPY go.mod .
 COPY go.sum .
 WORKDIR /app
 COPY go.mod .
 COPY go.sum .
index 90f8471..2fd7194 100644 (file)
@@ -36,7 +36,7 @@ The configured public key and cerificate shall be PEM-encoded. A self signed cer
 
 At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
 
 
 At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
 
-Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
 
 ## Development
 
 
 ## Development
 
index 1c42942..d4694bf 100644 (file)
@@ -34,7 +34,7 @@ import (
 type TypeData struct {
        TypeId        string `json:"id"`
        DMaaPTopicURL string `json:"dmaapTopicUrl"`
 type TypeData struct {
        TypeId        string `json:"id"`
        DMaaPTopicURL string `json:"dmaapTopicUrl"`
-       jobHandler    *jobHandler
+       jobsHandler   *jobsHandler
 }
 
 type JobInfo struct {
 }
 
 type JobInfo struct {
@@ -64,17 +64,6 @@ type JobsManagerImpl struct {
        distributeClient restclient.HTTPClient
 }
 
        distributeClient restclient.HTTPClient
 }
 
-type jobHandler struct {
-       mu               sync.Mutex
-       typeId           string
-       topicUrl         string
-       jobs             map[string]JobInfo
-       addJobCh         chan JobInfo
-       deleteJobCh      chan string
-       pollClient       restclient.HTTPClient
-       distributeClient restclient.HTTPClient
-}
-
 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
                configFile:       typeConfigFilePath,
 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
                configFile:       typeConfigFilePath,
@@ -88,7 +77,7 @@ func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPCli
 func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
        if err := jm.validateJobInfo(ji); err == nil {
                typeData := jm.allTypes[ji.InfoTypeIdentity]
 func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
        if err := jm.validateJobInfo(ji); err == nil {
                typeData := jm.allTypes[ji.InfoTypeIdentity]
-               typeData.jobHandler.addJobCh <- ji
+               typeData.jobsHandler.addJobCh <- ji
                log.Debug("Added job: ", ji)
                return nil
        } else {
                log.Debug("Added job: ", ji)
                return nil
        } else {
@@ -99,7 +88,7 @@ func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
 func (jm *JobsManagerImpl) DeleteJob(jobId string) {
        for _, typeData := range jm.allTypes {
                log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
 func (jm *JobsManagerImpl) DeleteJob(jobId string) {
        for _, typeData := range jm.allTypes {
                log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
-               typeData.jobHandler.deleteJobCh <- jobId
+               typeData.jobsHandler.deleteJobCh <- jobId
        }
        log.Debug("Deleted job: ", jobId)
 }
        }
        log.Debug("Deleted job: ", jobId)
 }
@@ -131,21 +120,10 @@ func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition
                return nil, err
        }
        for _, typeDef := range typeDefs.Types {
                return nil, err
        }
        for _, typeDef := range typeDefs.Types {
-               addCh := make(chan JobInfo)
-               deleteCh := make(chan string)
-               jh := jobHandler{
-                       typeId:           typeDef.Id,
-                       topicUrl:         typeDef.DmaapTopicURL,
-                       jobs:             make(map[string]JobInfo),
-                       addJobCh:         addCh,
-                       deleteJobCh:      deleteCh,
-                       pollClient:       jm.pollClient,
-                       distributeClient: jm.distributeClient,
-               }
                jm.allTypes[typeDef.Id] = TypeData{
                        TypeId:        typeDef.Id,
                        DMaaPTopicURL: typeDef.DmaapTopicURL,
                jm.allTypes[typeDef.Id] = TypeData{
                        TypeId:        typeDef.Id,
                        DMaaPTopicURL: typeDef.DmaapTopicURL,
-                       jobHandler:    &jh,
+                       jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
                }
        }
        return typeDefs.Types, nil
                }
        }
        return typeDefs.Types, nil
@@ -162,12 +140,35 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string {
 func (jm *JobsManagerImpl) StartJobs() {
        for _, jobType := range jm.allTypes {
 
 func (jm *JobsManagerImpl) StartJobs() {
        for _, jobType := range jm.allTypes {
 
-               go jobType.jobHandler.start(jm.mrAddress)
+               go jobType.jobsHandler.start(jm.mrAddress)
+
+       }
+}
+
+type jobsHandler struct {
+       mu               sync.Mutex
+       typeId           string
+       topicUrl         string
+       jobs             map[string]job
+       addJobCh         chan JobInfo
+       deleteJobCh      chan string
+       pollClient       restclient.HTTPClient
+       distributeClient restclient.HTTPClient
+}
 
 
+func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+       return &jobsHandler{
+               typeId:           typeId,
+               topicUrl:         topicURL,
+               jobs:             make(map[string]job),
+               addJobCh:         make(chan JobInfo),
+               deleteJobCh:      make(chan string),
+               pollClient:       pollClient,
+               distributeClient: distributeClient,
        }
 }
 
        }
 }
 
-func (jh *jobHandler) start(mRAddress string) {
+func (jh *jobsHandler) start(mRAddress string) {
        go func() {
                for {
                        jh.pollAndDistributeMessages(mRAddress)
        go func() {
                for {
                        jh.pollAndDistributeMessages(mRAddress)
@@ -181,45 +182,104 @@ func (jh *jobHandler) start(mRAddress string) {
        }()
 }
 
        }()
 }
 
-func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
        log.Debugf("Processing jobs for type: %v", jh.typeId)
        messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
        if error != nil {
        log.Debugf("Processing jobs for type: %v", jh.typeId)
        messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
        if error != nil {
-               log.Warnf("Error getting data from MR. Cause: %v", error)
+               log.Warn("Error getting data from MR. Cause: ", error)
        }
        }
-       log.Debugf("Received messages: %v", string(messagesBody))
+       log.Debug("Received messages: ", string(messagesBody))
        jh.distributeMessages(messagesBody)
 }
 
        jh.distributeMessages(messagesBody)
 }
 
-func (jh *jobHandler) distributeMessages(messages []byte) {
+func (jh *jobsHandler) distributeMessages(messages []byte) {
        if len(messages) > 2 {
                jh.mu.Lock()
                defer jh.mu.Unlock()
        if len(messages) > 2 {
                jh.mu.Lock()
                defer jh.mu.Unlock()
-               for _, jobInfo := range jh.jobs {
-                       go jh.sendMessagesToConsumer(messages, jobInfo)
+               for _, job := range jh.jobs {
+                       if len(job.messagesChannel) < cap(job.messagesChannel) {
+                               job.messagesChannel <- messages
+                       } else {
+                               jh.emptyMessagesBuffer(job)
+                       }
                }
        }
 }
 
                }
        }
 }
 
-func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
-       log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
-       if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
-               log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+       log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out:
+       for {
+               select {
+               case <-job.messagesChannel:
+               default:
+                       break out
+               }
        }
        }
-       log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
 }
 
 }
 
-func (jh *jobHandler) monitorManagementChannels() {
+func (jh *jobsHandler) monitorManagementChannels() {
        select {
        case addedJob := <-jh.addJobCh:
        select {
        case addedJob := <-jh.addJobCh:
-               jh.mu.Lock()
-               log.Debugf("received %v from addJobCh\n", addedJob)
-               jh.jobs[addedJob.InfoJobIdentity] = addedJob
-               jh.mu.Unlock()
+               jh.addJob(addedJob)
        case deletedJob := <-jh.deleteJobCh:
        case deletedJob := <-jh.deleteJobCh:
-               jh.mu.Lock()
-               log.Debugf("received %v from deleteJobCh\n", deletedJob)
+               jh.deleteJob(deletedJob)
+       }
+}
+
+func (jh *jobsHandler) addJob(addedJob JobInfo) {
+       jh.mu.Lock()
+       log.Debug("Add job: ", addedJob)
+       newJob := newJob(addedJob, jh.distributeClient)
+       go newJob.start()
+       jh.jobs[addedJob.InfoJobIdentity] = newJob
+       jh.mu.Unlock()
+}
+
+func (jh *jobsHandler) deleteJob(deletedJob string) {
+       jh.mu.Lock()
+       log.Debug("Delete job: ", deletedJob)
+       j, exist := jh.jobs[deletedJob]
+       if exist {
+               j.controlChannel <- struct{}{}
                delete(jh.jobs, deletedJob)
                delete(jh.jobs, deletedJob)
-               jh.mu.Unlock()
        }
        }
+       jh.mu.Unlock()
+}
+
+type job struct {
+       jobInfo         JobInfo
+       client          restclient.HTTPClient
+       messagesChannel chan []byte
+       controlChannel  chan struct{}
+}
+
+func newJob(j JobInfo, c restclient.HTTPClient) job {
+       return job{
+               jobInfo:         j,
+               client:          c,
+               messagesChannel: make(chan []byte, 10),
+               controlChannel:  make(chan struct{}),
+       }
+}
+
+func (j *job) start() {
+out:
+       for {
+               select {
+               case <-j.controlChannel:
+                       log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+                       break out
+               case msg := <-j.messagesChannel:
+                       j.sendMessagesToConsumer(msg)
+               }
+       }
+}
+
+func (j *job) sendMessagesToConsumer(messages []byte) {
+       log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
+       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+               log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+       }
+       log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }
 }
index 3651a13..066823d 100644 (file)
@@ -36,7 +36,7 @@ import (
 
 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
 
 
 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
 
-func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
        typesDir, err := os.MkdirTemp("", "configs")
        if err != nil {
        assertions := require.New(t)
        typesDir, err := os.MkdirTemp("", "configs")
        if err != nil {
@@ -63,7 +63,7 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes
        assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
 
        assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
 
-func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        wantedJob := JobInfo{
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        wantedJob := JobInfo{
@@ -74,11 +74,11 @@ func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
                InfoJobData:      "{}",
                InfoTypeIdentity: "type1",
        }
                InfoJobData:      "{}",
                InfoTypeIdentity: "type1",
        }
-       jobHandler := jobHandler{
+       jobsHandler := jobsHandler{
                addJobCh: make(chan JobInfo)}
        managerUnderTest.allTypes["type1"] = TypeData{
                addJobCh: make(chan JobInfo)}
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId:     "type1",
-               jobHandler: &jobHandler,
+               TypeId:      "type1",
+               jobsHandler: &jobsHandler,
        }
 
        var err error
        }
 
        var err error
@@ -87,11 +87,11 @@ func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
        }()
 
        assertions.Nil(err)
        }()
 
        assertions.Nil(err)
-       addedJob := <-jobHandler.addJobCh
+       addedJob := <-jobsHandler.addJobCh
        assertions.Equal(wantedJob, addedJob)
 }
 
        assertions.Equal(wantedJob, addedJob)
 }
 
-func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        jobInfo := JobInfo{
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        jobInfo := JobInfo{
@@ -103,7 +103,7 @@ func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
        assertions.Equal("type not supported: type1", err.Error())
 }
 
        assertions.Equal("type not supported: type1", err.Error())
 }
 
-func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
@@ -118,7 +118,7 @@ func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
        assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
-func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
@@ -134,33 +134,37 @@ func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
 }
 
        assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
 }
 
-func TestManagerDeleteJob(t *testing.T) {
+func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
-       jobHandler := jobHandler{
+       jobsHandler := jobsHandler{
                deleteJobCh: make(chan string)}
        managerUnderTest.allTypes["type1"] = TypeData{
                deleteJobCh: make(chan string)}
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId:     "type1",
-               jobHandler: &jobHandler,
+               TypeId:      "type1",
+               jobsHandler: &jobsHandler,
        }
 
        go managerUnderTest.DeleteJob("job2")
 
        }
 
        go managerUnderTest.DeleteJob("job2")
 
-       assertions.Equal("job2", <-jobHandler.deleteJobCh)
+       assertions.Equal("job2", <-jobsHandler.deleteJobCh)
 }
 
 }
 
-func TestHandlerPollAndDistributeMessages(t *testing.T) {
+func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
        assertions := require.New(t)
 
        assertions := require.New(t)
 
-       wg := sync.WaitGroup{}
+       called := false
        messages := `[{"message": {"data": "data"}}]`
        pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://mrAddr/topicUrl" {
                        assertions.Equal(req.Method, "GET")
        messages := `[{"message": {"data": "data"}}]`
        pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://mrAddr/topicUrl" {
                        assertions.Equal(req.Method, "GET")
-                       wg.Done() // Signal that the poll call has been made
+                       body := "[]"
+                       if !called {
+                               called = true
+                               body = messages
+                       }
                        return &http.Response{
                                StatusCode: 200,
                        return &http.Response{
                                StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
+                               Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
                                Header:     make(http.Header), // Must be set to non-nil value or it panics
                        }
                }
                                Header:     make(http.Header), // Must be set to non-nil value or it panics
                        }
                }
@@ -168,12 +172,14 @@ func TestHandlerPollAndDistributeMessages(t *testing.T) {
                t.Fail()
                return nil
        })
                t.Fail()
                return nil
        })
+
+       wg := sync.WaitGroup{}
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(messages, getBodyAsString(req))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(messages, getBodyAsString(req))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done() // Signal that the distribution call has been made
+                       wg.Done()
                        return &http.Response{
                                StatusCode: 200,
                                Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
                        return &http.Response{
                                StatusCode: 200,
                                Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
@@ -184,73 +190,72 @@ func TestHandlerPollAndDistributeMessages(t *testing.T) {
                t.Fail()
                return nil
        })
                t.Fail()
                return nil
        })
+       jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+       jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+       jobsManager.allTypes["type1"] = TypeData{
+               DMaaPTopicURL: "/topicUrl",
+               TypeId:        "type1",
+               jobsHandler:   jobsHandler,
+       }
+
+       jobsManager.StartJobs()
 
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
                TargetUri:        "http://consumerHost/target",
        }
 
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
                TargetUri:        "http://consumerHost/target",
        }
-       handlerUnderTest := jobHandler{
-               topicUrl:         "/topicUrl",
-               jobs:             map[string]JobInfo{jobInfo.InfoJobIdentity: jobInfo},
-               pollClient:       pollClientMock,
-               distributeClient: distributeClientMock,
-       }
 
 
-       wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
-       handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
+       wg.Add(1) // Wait till the distribution has happened
+       jobsManager.AddJob(jobInfo)
 
 
-       if waitTimeout(&wg, 100*time.Millisecond) {
+       if waitTimeout(&wg, 2*time.Second) {
                t.Error("Not all calls to server were made")
                t.Fail()
        }
 }
 
                t.Error("Not all calls to server were made")
                t.Fail()
        }
 }
 
-func TestHandlerAddJob_shouldAddJobToJobsMap(t *testing.T) {
-       assertions := require.New(t)
-
-       jobInfo := JobInfo{
-               InfoTypeIdentity: "type1",
-               InfoJobIdentity:  "job1",
-               TargetUri:        "http://consumerHost/target",
-       }
-
-       addCh := make(chan JobInfo)
-       handlerUnderTest := jobHandler{
-               mu:       sync.Mutex{},
-               jobs:     map[string]JobInfo{},
-               addJobCh: addCh,
-       }
+func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
+       jobToDelete := newJob(JobInfo{}, nil)
+       go jobToDelete.start()
+       jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+       jobsHandler.jobs["job1"] = jobToDelete
 
 
-       go func() {
-               addCh <- jobInfo
-       }()
+       go jobsHandler.monitorManagementChannels()
 
 
-       handlerUnderTest.monitorManagementChannels()
+       jobsHandler.deleteJobCh <- "job1"
 
 
-       assertions.Len(handlerUnderTest.jobs, 1)
-       assertions.Equal(jobInfo, handlerUnderTest.jobs["job1"])
+       deleted := false
+       for i := 0; i < 100; i++ {
+               if len(jobsHandler.jobs) == 0 {
+                       deleted = true
+                       break
+               }
+               time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
+       }
+       require.New(t).True(deleted, "Job not deleted")
 }
 
 }
 
-func TestHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
-       assertions := require.New(t)
+func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
+       job := newJob(JobInfo{
+               InfoJobIdentity: "job",
+       }, nil)
 
 
-       deleteCh := make(chan string)
-       handlerUnderTest := jobHandler{
-               mu: sync.Mutex{},
-               jobs: map[string]JobInfo{"job1": {
-                       InfoJobIdentity: "job1",
-               }},
-               deleteJobCh: deleteCh,
-       }
+       jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+       jobsHandler.jobs["job1"] = job
 
 
-       go func() {
-               deleteCh <- "job1"
-       }()
+       fillMessagesBuffer(job.messagesChannel)
 
 
-       handlerUnderTest.monitorManagementChannels()
+       jobsHandler.distributeMessages([]byte("sent msg"))
 
 
-       assertions.Len(handlerUnderTest.jobs, 0)
+       require.New(t).Len(job.messagesChannel, 0)
+}
+
+func fillMessagesBuffer(mc chan []byte) {
+       for i := 0; i < cap(mc); i++ {
+               mc <- []byte("msg")
+       }
 }
 
 type RoundTripFunc func(req *http.Request) *http.Response
 }
 
 type RoundTripFunc func(req *http.Request) *http.Response