Merge "Added tests and improvements"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 )
36
37 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38
39 func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
40         assertions := require.New(t)
41         typesDir, err := os.MkdirTemp("", "configs")
42         if err != nil {
43                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44         }
45         fname := filepath.Join(typesDir, "type_config.json")
46         managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
47         t.Cleanup(func() {
48                 os.RemoveAll(typesDir)
49         })
50         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
51                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
52         }
53         types, err := managerUnderTest.LoadTypesFromConfiguration()
54         wantedType := config.TypeDefinition{
55                 Id:            "type1",
56                 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
57         }
58         wantedTypes := []config.TypeDefinition{wantedType}
59         assertions.EqualValues(wantedTypes, types)
60         assertions.Nil(err)
61
62         supportedTypes := managerUnderTest.GetSupportedTypes()
63         assertions.EqualValues([]string{"type1"}, supportedTypes)
64 }
65
66 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
67         assertions := require.New(t)
68         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
69         wantedJob := JobInfo{
70                 Owner:            "owner",
71                 LastUpdated:      "now",
72                 InfoJobIdentity:  "job1",
73                 TargetUri:        "target",
74                 InfoJobData:      "{}",
75                 InfoTypeIdentity: "type1",
76         }
77         jobsHandler := jobsHandler{
78                 addJobCh: make(chan JobInfo)}
79         managerUnderTest.allTypes["type1"] = TypeData{
80                 TypeId:      "type1",
81                 jobsHandler: &jobsHandler,
82         }
83
84         var err error
85         go func() {
86                 err = managerUnderTest.AddJobFromRESTCall(wantedJob)
87         }()
88
89         assertions.Nil(err)
90         addedJob := <-jobsHandler.addJobCh
91         assertions.Equal(wantedJob, addedJob)
92 }
93
94 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
95         assertions := require.New(t)
96         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
97         jobInfo := JobInfo{
98                 InfoTypeIdentity: "type1",
99         }
100
101         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
102         assertions.NotNil(err)
103         assertions.Equal("type not supported: type1", err.Error())
104 }
105
106 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
107         assertions := require.New(t)
108         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
109         managerUnderTest.allTypes["type1"] = TypeData{
110                 TypeId: "type1",
111         }
112
113         jobInfo := JobInfo{
114                 InfoTypeIdentity: "type1",
115         }
116         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
117         assertions.NotNil(err)
118         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
119 }
120
121 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
122         assertions := require.New(t)
123         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
124         managerUnderTest.allTypes["type1"] = TypeData{
125                 TypeId: "type1",
126         }
127
128         jobInfo := JobInfo{
129                 InfoTypeIdentity: "type1",
130                 InfoJobIdentity:  "job1",
131         }
132         err := managerUnderTest.AddJobFromRESTCall(jobInfo)
133         assertions.NotNil(err)
134         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
135 }
136
137 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
138         assertions := require.New(t)
139         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
140         jobsHandler := jobsHandler{
141                 deleteJobCh: make(chan string)}
142         managerUnderTest.allTypes["type1"] = TypeData{
143                 TypeId:      "type1",
144                 jobsHandler: &jobsHandler,
145         }
146
147         go managerUnderTest.DeleteJobFromRESTCall("job2")
148
149         assertions.Equal("job2", <-jobsHandler.deleteJobCh)
150 }
151
152 func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
153         assertions := require.New(t)
154
155         called := false
156         messages := `[{"message": {"data": "data"}}]`
157         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
158                 if req.URL.String() == "http://mrAddr/topicUrl" {
159                         assertions.Equal(req.Method, "GET")
160                         body := "[]"
161                         if !called {
162                                 called = true
163                                 body = messages
164                         }
165                         return &http.Response{
166                                 StatusCode: 200,
167                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
168                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
169                         }
170                 }
171                 t.Error("Wrong call to client: ", req)
172                 t.Fail()
173                 return nil
174         })
175
176         wg := sync.WaitGroup{}
177         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
178                 if req.URL.String() == "http://consumerHost/target" {
179                         assertions.Equal(req.Method, "POST")
180                         assertions.Equal(messages, getBodyAsString(req, t))
181                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
182                         wg.Done()
183                         return &http.Response{
184                                 StatusCode: 200,
185                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
186                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
187                         }
188                 }
189                 t.Error("Wrong call to client: ", req)
190                 t.Fail()
191                 return nil
192         })
193         jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
194
195         jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
196         jobsManager.allTypes["type1"] = TypeData{
197                 DMaaPTopicURL: "/topicUrl",
198                 TypeId:        "type1",
199                 jobsHandler:   jobsHandler,
200         }
201
202         jobsManager.StartJobsForAllTypes()
203
204         jobInfo := JobInfo{
205                 InfoTypeIdentity: "type1",
206                 InfoJobIdentity:  "job1",
207                 TargetUri:        "http://consumerHost/target",
208         }
209
210         wg.Add(1) // Wait till the distribution has happened
211         err := jobsManager.AddJobFromRESTCall(jobInfo)
212         assertions.Nil(err)
213
214         if waitTimeout(&wg, 2*time.Second) {
215                 t.Error("Not all calls to server were made")
216                 t.Fail()
217         }
218 }
219
220 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
221         jobToDelete := newJob(JobInfo{}, nil)
222         go jobToDelete.start()
223         jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
224         jobsHandler.jobs["job1"] = jobToDelete
225
226         go jobsHandler.monitorManagementChannels()
227
228         jobsHandler.deleteJobCh <- "job1"
229
230         deleted := false
231         for i := 0; i < 100; i++ {
232                 if len(jobsHandler.jobs) == 0 {
233                         deleted = true
234                         break
235                 }
236                 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
237         }
238         require.New(t).True(deleted, "Job not deleted")
239 }
240
241 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
242         job := newJob(JobInfo{
243                 InfoJobIdentity: "job",
244         }, nil)
245
246         jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
247         jobsHandler.jobs["job1"] = job
248
249         fillMessagesBuffer(job.messagesChannel)
250
251         jobsHandler.distributeMessages([]byte("sent msg"))
252
253         require.New(t).Len(job.messagesChannel, 0)
254 }
255
256 func fillMessagesBuffer(mc chan []byte) {
257         for i := 0; i < cap(mc); i++ {
258                 mc <- []byte("msg")
259         }
260 }
261
262 type RoundTripFunc func(req *http.Request) *http.Response
263
264 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
265         return f(req), nil
266 }
267
268 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
269 func NewTestClient(fn RoundTripFunc) *http.Client {
270         return &http.Client{
271                 Transport: RoundTripFunc(fn),
272         }
273 }
274
275 // waitTimeout waits for the waitgroup for the specified max timeout.
276 // Returns true if waiting timed out.
277 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
278         c := make(chan struct{})
279         go func() {
280                 defer close(c)
281                 wg.Wait()
282         }()
283         select {
284         case <-c:
285                 return false // completed normally
286         case <-time.After(timeout):
287                 return true // timed out
288         }
289 }
290
291 func getBodyAsString(req *http.Request, t *testing.T) string {
292         buf := new(bytes.Buffer)
293         if _, err := buf.ReadFrom(req.Body); err != nil {
294                 t.Fail()
295         }
296         return buf.String()
297 }