Merge "Change of ECS to ICS in test env"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
index 066823d..30b4ffd 100644 (file)
@@ -24,8 +24,6 @@ import (
        "bytes"
        "io/ioutil"
        "net/http"
-       "os"
-       "path/filepath"
        "sync"
        "testing"
        "time"
@@ -38,26 +36,18 @@ const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unau
 
 func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
-       typesDir, err := os.MkdirTemp("", "configs")
-       if err != nil {
-               t.Errorf("Unable to create temporary directory for types due to: %v", err)
-       }
-       fname := filepath.Join(typesDir, "type_config.json")
-       managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
-       t.Cleanup(func() {
-               os.RemoveAll(typesDir)
-       })
-       if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
-               t.Errorf("Unable to create temporary config file for types due to: %v", err)
-       }
-       types, err := managerUnderTest.LoadTypesFromConfiguration()
+
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+
        wantedType := config.TypeDefinition{
                Id:            "type1",
                DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
        }
        wantedTypes := []config.TypeDefinition{wantedType}
+
+       types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
+
        assertions.EqualValues(wantedTypes, types)
-       assertions.Nil(err)
 
        supportedTypes := managerUnderTest.GetSupportedTypes()
        assertions.EqualValues([]string{"type1"}, supportedTypes)
@@ -65,7 +55,7 @@ func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedT
 
 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        wantedJob := JobInfo{
                Owner:            "owner",
                LastUpdated:      "now",
@@ -83,7 +73,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T
 
        var err error
        go func() {
-               err = managerUnderTest.AddJob(wantedJob)
+               err = managerUnderTest.AddJobFromRESTCall(wantedJob)
        }()
 
        assertions.Nil(err)
@@ -93,19 +83,19 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T
 
 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
        }
 
-       err := managerUnderTest.AddJob(jobInfo)
+       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
        assertions.Equal("type not supported: type1", err.Error())
 }
 
 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
                TypeId: "type1",
        }
@@ -113,14 +103,14 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
        }
-       err := managerUnderTest.AddJob(jobInfo)
+       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
        assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
                TypeId: "type1",
        }
@@ -129,14 +119,14 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
        }
-       err := managerUnderTest.AddJob(jobInfo)
+       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
        assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
 }
 
 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        jobsHandler := jobsHandler{
                deleteJobCh: make(chan string)}
        managerUnderTest.allTypes["type1"] = TypeData{
@@ -144,7 +134,7 @@ func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
                jobsHandler: &jobsHandler,
        }
 
-       go managerUnderTest.DeleteJob("job2")
+       go managerUnderTest.DeleteJobFromRESTCall("job2")
 
        assertions.Equal("job2", <-jobsHandler.deleteJobCh)
 }
@@ -177,7 +167,7 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal(messages, getBodyAsString(req))
+                       assertions.Equal(messages, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
@@ -192,14 +182,14 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
        })
        jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
 
-       jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+       jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
        jobsManager.allTypes["type1"] = TypeData{
                DMaaPTopicURL: "/topicUrl",
                TypeId:        "type1",
                jobsHandler:   jobsHandler,
        }
 
-       jobsManager.StartJobs()
+       jobsManager.StartJobsForAllTypes()
 
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
@@ -208,7 +198,8 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
        }
 
        wg.Add(1) // Wait till the distribution has happened
-       jobsManager.AddJob(jobInfo)
+       err := jobsManager.AddJobFromRESTCall(jobInfo)
+       assertions.Nil(err)
 
        if waitTimeout(&wg, 2*time.Second) {
                t.Error("Not all calls to server were made")
@@ -287,8 +278,10 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
        }
 }
 
-func getBodyAsString(req *http.Request) string {
+func getBodyAsString(req *http.Request, t *testing.T) string {
        buf := new(bytes.Buffer)
-       buf.ReadFrom(req.Body)
+       if _, err := buf.ReadFrom(req.Body); err != nil {
+               t.Fail()
+       }
        return buf.String()
 }