+
+func TestPollAndDistributeMessages(t *testing.T) {
+ assertions := require.New(t)
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ InfoJobIdentity: "job1",
+ TargetUri: "http://consumerHost/target",
+ }
+ allJobs["type1"] = Type{
+ TypeId: "type1",
+ DMaaPTopic: "topic",
+ Jobs: map[string]JobInfo{"job1": jobInfo},
+ }
+ t.Cleanup(func() {
+ clearAll()
+ })
+
+ body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`)))
+ clientMock := mocks.HTTPClient{}
+ clientMock.On("Get", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusOK,
+ Body: body,
+ }, nil)
+
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusOK,
+ }, nil)
+
+ restclient.Client = &clientMock
+
+ pollAndDistributeMessages("http://mrAddr")
+
+ time.Sleep(100 * time.Millisecond)
+
+ var actualRequest *http.Request
+ clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer")
+ clientMock.AssertNumberOfCalls(t, "Get", 1)
+
+ clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+ actualRequest = req
+ return true
+ }))
+ assertions.Equal(http.MethodPost, actualRequest.Method)
+ assertions.Equal("consumerHost", actualRequest.URL.Host)
+ assertions.Equal("/target", actualRequest.URL.Path)
+ assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+ actualBody, _ := ioutil.ReadAll(actualRequest.Body)
+ assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody)
+ clientMock.AssertNumberOfCalls(t, "Do", 1)
+}