+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ jobsHandler := jobsHandler{
+ deleteJobCh: make(chan string)}
+ managerUnderTest.allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ jobsHandler: &jobsHandler,
+ }
+
+ go managerUnderTest.DeleteJobFromRESTCall("job2")
+
+ assertions.Equal("job2", <-jobsHandler.deleteJobCh)
+}
+
+func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
+ assertions := require.New(t)
+
+ called := false
+ messages := `[{"message": {"data": "data"}}]`
+ pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://mrAddr/topicUrl" {
+ assertions.Equal(req.Method, "GET")
+ body := "[]"
+ if !called {
+ called = true
+ body = messages
+ }
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+
+ wg := sync.WaitGroup{}
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/target" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal(messages, getBodyAsString(req, t))
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+ jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+ jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
+ jobsManager.allTypes["type1"] = TypeData{
+ DMaaPTopicURL: "/topicUrl",
+ TypeId: "type1",
+ jobsHandler: jobsHandler,
+ }
+
+ jobsManager.StartJobsForAllTypes()
+