+
+ called := false
+ messages := `[{"message": {"data": "data"}}]`
+ pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://mrAddr/topicUrl" {
+ assertions.Equal(req.Method, "GET")
+ body := "[]"
+ if !called {
+ called = true
+ body = messages
+ }
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+
+ wg := sync.WaitGroup{}
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/target" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal(messages, getBodyAsString(req))
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+ jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+ jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+ jobsManager.allTypes["type1"] = TypeData{
+ DMaaPTopicURL: "/topicUrl",
+ TypeId: "type1",
+ jobsHandler: jobsHandler,
+ }
+
+ jobsManager.StartJobs()
+