+ jobsHandler := jobsHandler{
+ addJobCh: make(chan JobInfo)}
+ managerUnderTest.allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ jobsHandler: &jobsHandler,
+ }
+
+ var err error
+ go func() {
+ err = managerUnderTest.AddJobFromRESTCall(wantedJob)
+ }()
+
+ assertions.Nil(err)
+ addedJob := <-jobsHandler.addJobCh
+ assertions.Equal(wantedJob, addedJob)
+}
+
+func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
+ assertions := require.New(t)
+ managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ }
+
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
+ assertions.NotNil(err)
+ assertions.Equal("type not supported: type1", err.Error())
+}
+
+func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
+ assertions := require.New(t)
+ managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest.allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ }
+
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ }
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
+ assertions.NotNil(err)
+ assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
+}
+
+func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
+ assertions := require.New(t)
+ managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest.allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ }
+
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ InfoJobIdentity: "job1",
+ }
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
+ assertions.NotNil(err)
+ assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
+}
+
+func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
+ assertions := require.New(t)
+ managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ jobsHandler := jobsHandler{
+ deleteJobCh: make(chan string)}
+ managerUnderTest.allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ jobsHandler: &jobsHandler,
+ }
+
+ go managerUnderTest.DeleteJobFromRESTCall("job2")
+
+ assertions.Equal("job2", <-jobsHandler.deleteJobCh)
+}
+
+func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
+ assertions := require.New(t)
+
+ called := false
+ messages := `[{"message": {"data": "data"}}]`
+ pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://mrAddr/topicUrl" {
+ assertions.Equal(req.Method, "GET")
+ body := "[]"
+ if !called {
+ called = true
+ body = messages
+ }
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+
+ wg := sync.WaitGroup{}
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/target" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal(messages, getBodyAsString(req, t))
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+ jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+ jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+ jobsManager.allTypes["type1"] = TypeData{
+ DMaaPTopicURL: "/topicUrl",
+ TypeId: "type1",
+ jobsHandler: jobsHandler,
+ }
+
+ jobsManager.StartJobsForAllTypes()
+
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ InfoJobIdentity: "job1",
+ TargetUri: "http://consumerHost/target",
+ }