Refactor for better separation DMaaP Mediator
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "sync"
28         "testing"
29         "time"
30
31         "github.com/stretchr/testify/require"
32         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
33 )
34
35 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
36
37 func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
38         assertions := require.New(t)
39
40         managerUnderTest := NewJobsManagerImpl(nil, "", nil)
41
42         wantedType := config.TypeDefinition{
43                 Id:            "type1",
44                 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
45         }
46         wantedTypes := []config.TypeDefinition{wantedType}
47
48         types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
49
50         assertions.EqualValues(wantedTypes, types)
51
52         supportedTypes := managerUnderTest.GetSupportedTypes()
53         assertions.EqualValues([]string{"type1"}, supportedTypes)
54 }
55
56 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
57         assertions := require.New(t)
58         managerUnderTest := NewJobsManagerImpl(nil, "", nil)
59         wantedJob := JobInfo{
60                 Owner:            "owner",
61                 LastUpdated:      "now",
62                 InfoJobIdentity:  "job1",
63                 TargetUri:        "target",
64                 InfoJobData:      "{}",
65                 InfoTypeIdentity: "type1",
66         }
67         jobsHandler := jobsHandler{
68                 addJobCh: make(chan JobInfo)}
69         managerUnderTest.allTypes["type1"] = TypeData{
70                 TypeId:      "type1",
71                 jobsHandler: &jobsHandler,
72         }
73
74         var err error
75         go func() {
76                 err = managerUnderTest.AddJobFromRESTCall(wantedJob)
77         }()
78
79         assertions.Nil(err)
80         addedJob := <-jobsHandler.addJobCh
81         assertions.Equal(wantedJob, addedJob)
82 }
83
84 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
85         assertions := require.New(t)
86         managerUnderTest := NewJobsManagerImpl(nil, "", nil)
87         jobInfo := JobInfo{
88                 InfoTypeIdentity: "type1",
89         }
90
91         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
92         assertions.NotNil(err)
93         assertions.Equal("type not supported: type1", err.Error())
94 }
95
96 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
97         assertions := require.New(t)
98         managerUnderTest := NewJobsManagerImpl(nil, "", nil)
99         managerUnderTest.allTypes["type1"] = TypeData{
100                 TypeId: "type1",
101         }
102
103         jobInfo := JobInfo{
104                 InfoTypeIdentity: "type1",
105         }
106         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
107         assertions.NotNil(err)
108         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
109 }
110
111 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
112         assertions := require.New(t)
113         managerUnderTest := NewJobsManagerImpl(nil, "", nil)
114         managerUnderTest.allTypes["type1"] = TypeData{
115                 TypeId: "type1",
116         }
117
118         jobInfo := JobInfo{
119                 InfoTypeIdentity: "type1",
120                 InfoJobIdentity:  "job1",
121         }
122         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
123         assertions.NotNil(err)
124         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
125 }
126
127 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
128         assertions := require.New(t)
129         managerUnderTest := NewJobsManagerImpl(nil, "", nil)
130         jobsHandler := jobsHandler{
131                 deleteJobCh: make(chan string)}
132         managerUnderTest.allTypes["type1"] = TypeData{
133                 TypeId:      "type1",
134                 jobsHandler: &jobsHandler,
135         }
136
137         go managerUnderTest.DeleteJobFromRESTCall("job2")
138
139         assertions.Equal("job2", <-jobsHandler.deleteJobCh)
140 }
141
142 func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
143         assertions := require.New(t)
144
145         called := false
146         messages := `[{"message": {"data": "data"}}]`
147         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
148                 if req.URL.String() == "http://mrAddr/topicUrl" {
149                         assertions.Equal(req.Method, "GET")
150                         body := "[]"
151                         if !called {
152                                 called = true
153                                 body = messages
154                         }
155                         return &http.Response{
156                                 StatusCode: 200,
157                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
158                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
159                         }
160                 }
161                 t.Error("Wrong call to client: ", req)
162                 t.Fail()
163                 return nil
164         })
165
166         wg := sync.WaitGroup{}
167         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
168                 if req.URL.String() == "http://consumerHost/target" {
169                         assertions.Equal(req.Method, "POST")
170                         assertions.Equal(messages, getBodyAsString(req, t))
171                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
172                         wg.Done()
173                         return &http.Response{
174                                 StatusCode: 200,
175                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
176                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
177                         }
178                 }
179                 t.Error("Wrong call to client: ", req)
180                 t.Fail()
181                 return nil
182         })
183         jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
184
185         jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
186         jobsManager.allTypes["type1"] = TypeData{
187                 DMaaPTopicURL: "/topicUrl",
188                 TypeId:        "type1",
189                 jobsHandler:   jobsHandler,
190         }
191
192         jobsManager.StartJobsForAllTypes()
193
194         jobInfo := JobInfo{
195                 InfoTypeIdentity: "type1",
196                 InfoJobIdentity:  "job1",
197                 TargetUri:        "http://consumerHost/target",
198         }
199
200         wg.Add(1) // Wait till the distribution has happened
201         err := jobsManager.AddJobFromRESTCall(jobInfo)
202         assertions.Nil(err)
203
204         if waitTimeout(&wg, 2*time.Second) {
205                 t.Error("Not all calls to server were made")
206                 t.Fail()
207         }
208 }
209
210 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
211         jobToDelete := newJob(JobInfo{}, nil)
212         go jobToDelete.start()
213         jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
214         jobsHandler.jobs["job1"] = jobToDelete
215
216         go jobsHandler.monitorManagementChannels()
217
218         jobsHandler.deleteJobCh <- "job1"
219
220         deleted := false
221         for i := 0; i < 100; i++ {
222                 if len(jobsHandler.jobs) == 0 {
223                         deleted = true
224                         break
225                 }
226                 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
227         }
228         require.New(t).True(deleted, "Job not deleted")
229 }
230
231 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
232         job := newJob(JobInfo{
233                 InfoJobIdentity: "job",
234         }, nil)
235
236         jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
237         jobsHandler.jobs["job1"] = job
238
239         fillMessagesBuffer(job.messagesChannel)
240
241         jobsHandler.distributeMessages([]byte("sent msg"))
242
243         require.New(t).Len(job.messagesChannel, 0)
244 }
245
246 func fillMessagesBuffer(mc chan []byte) {
247         for i := 0; i < cap(mc); i++ {
248                 mc <- []byte("msg")
249         }
250 }
251
252 type RoundTripFunc func(req *http.Request) *http.Response
253
254 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
255         return f(req), nil
256 }
257
258 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
259 func NewTestClient(fn RoundTripFunc) *http.Client {
260         return &http.Client{
261                 Transport: RoundTripFunc(fn),
262         }
263 }
264
265 // waitTimeout waits for the waitgroup for the specified max timeout.
266 // Returns true if waiting timed out.
267 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
268         c := make(chan struct{})
269         go func() {
270                 defer close(c)
271                 wg.Wait()
272         }()
273         select {
274         case <-c:
275                 return false // completed normally
276         case <-time.After(timeout):
277                 return true // timed out
278         }
279 }
280
281 func getBodyAsString(req *http.Request, t *testing.T) string {
282         buf := new(bytes.Buffer)
283         if _, err := buf.ReadFrom(req.Body); err != nil {
284                 t.Fail()
285         }
286         return buf.String()
287 }