Use env varialbes to replace image urls & tags
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 )
36
37 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38
39 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
40         assertions := require.New(t)
41         typesDir, err := os.MkdirTemp("", "configs")
42         if err != nil {
43                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44         }
45         fname := filepath.Join(typesDir, "type_config.json")
46         managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
47         t.Cleanup(func() {
48                 os.RemoveAll(typesDir)
49         })
50         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
51                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
52         }
53         types, err := managerUnderTest.LoadTypesFromConfiguration()
54         wantedType := config.TypeDefinition{
55                 Id:            "type1",
56                 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
57         }
58         wantedTypes := []config.TypeDefinition{wantedType}
59         assertions.EqualValues(wantedTypes, types)
60         assertions.Nil(err)
61
62         supportedTypes := managerUnderTest.GetSupportedTypes()
63         assertions.EqualValues([]string{"type1"}, supportedTypes)
64 }
65
66 func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
67         assertions := require.New(t)
68         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
69         wantedJob := JobInfo{
70                 Owner:            "owner",
71                 LastUpdated:      "now",
72                 InfoJobIdentity:  "job1",
73                 TargetUri:        "target",
74                 InfoJobData:      "{}",
75                 InfoTypeIdentity: "type1",
76         }
77         jobHandler := jobHandler{
78                 addJobCh: make(chan JobInfo)}
79         managerUnderTest.allTypes["type1"] = TypeData{
80                 TypeId:     "type1",
81                 jobHandler: &jobHandler,
82         }
83
84         var err error
85         go func() {
86                 err = managerUnderTest.AddJob(wantedJob)
87         }()
88
89         assertions.Nil(err)
90         addedJob := <-jobHandler.addJobCh
91         assertions.Equal(wantedJob, addedJob)
92 }
93
94 func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
95         assertions := require.New(t)
96         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
97         jobInfo := JobInfo{
98                 InfoTypeIdentity: "type1",
99         }
100
101         err := managerUnderTest.AddJob(jobInfo)
102         assertions.NotNil(err)
103         assertions.Equal("type not supported: type1", err.Error())
104 }
105
106 func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
107         assertions := require.New(t)
108         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
109         managerUnderTest.allTypes["type1"] = TypeData{
110                 TypeId: "type1",
111         }
112
113         jobInfo := JobInfo{
114                 InfoTypeIdentity: "type1",
115         }
116         err := managerUnderTest.AddJob(jobInfo)
117         assertions.NotNil(err)
118         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
119 }
120
121 func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
122         assertions := require.New(t)
123         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
124         managerUnderTest.allTypes["type1"] = TypeData{
125                 TypeId: "type1",
126         }
127
128         jobInfo := JobInfo{
129                 InfoTypeIdentity: "type1",
130                 InfoJobIdentity:  "job1",
131         }
132         err := managerUnderTest.AddJob(jobInfo)
133         assertions.NotNil(err)
134         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
135 }
136
137 func TestManagerDeleteJob(t *testing.T) {
138         assertions := require.New(t)
139         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
140         jobHandler := jobHandler{
141                 deleteJobCh: make(chan string)}
142         managerUnderTest.allTypes["type1"] = TypeData{
143                 TypeId:     "type1",
144                 jobHandler: &jobHandler,
145         }
146
147         go managerUnderTest.DeleteJob("job2")
148
149         assertions.Equal("job2", <-jobHandler.deleteJobCh)
150 }
151
152 func TestHandlerPollAndDistributeMessages(t *testing.T) {
153         assertions := require.New(t)
154
155         wg := sync.WaitGroup{}
156         messages := `[{"message": {"data": "data"}}]`
157         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
158                 if req.URL.String() == "http://mrAddr/topicUrl" {
159                         assertions.Equal(req.Method, "GET")
160                         wg.Done() // Signal that the poll call has been made
161                         return &http.Response{
162                                 StatusCode: 200,
163                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
164                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
165                         }
166                 }
167                 t.Error("Wrong call to client: ", req)
168                 t.Fail()
169                 return nil
170         })
171         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
172                 if req.URL.String() == "http://consumerHost/target" {
173                         assertions.Equal(req.Method, "POST")
174                         assertions.Equal(messages, getBodyAsString(req))
175                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
176                         wg.Done() // Signal that the distribution call has been made
177                         return &http.Response{
178                                 StatusCode: 200,
179                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
180                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
181                         }
182                 }
183                 t.Error("Wrong call to client: ", req)
184                 t.Fail()
185                 return nil
186         })
187
188         jobInfo := JobInfo{
189                 InfoTypeIdentity: "type1",
190                 InfoJobIdentity:  "job1",
191                 TargetUri:        "http://consumerHost/target",
192         }
193         handlerUnderTest := jobHandler{
194                 topicUrl:         "/topicUrl",
195                 jobs:             map[string]JobInfo{jobInfo.InfoJobIdentity: jobInfo},
196                 pollClient:       pollClientMock,
197                 distributeClient: distributeClientMock,
198         }
199
200         wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
201         handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
202
203         if waitTimeout(&wg, 100*time.Millisecond) {
204                 t.Error("Not all calls to server were made")
205                 t.Fail()
206         }
207 }
208
209 func TestHandlerAddJob_shouldAddJobToJobsMap(t *testing.T) {
210         assertions := require.New(t)
211
212         jobInfo := JobInfo{
213                 InfoTypeIdentity: "type1",
214                 InfoJobIdentity:  "job1",
215                 TargetUri:        "http://consumerHost/target",
216         }
217
218         addCh := make(chan JobInfo)
219         handlerUnderTest := jobHandler{
220                 mu:       sync.Mutex{},
221                 jobs:     map[string]JobInfo{},
222                 addJobCh: addCh,
223         }
224
225         go func() {
226                 addCh <- jobInfo
227         }()
228
229         handlerUnderTest.monitorManagementChannels()
230
231         assertions.Len(handlerUnderTest.jobs, 1)
232         assertions.Equal(jobInfo, handlerUnderTest.jobs["job1"])
233 }
234
235 func TestHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
236         assertions := require.New(t)
237
238         deleteCh := make(chan string)
239         handlerUnderTest := jobHandler{
240                 mu: sync.Mutex{},
241                 jobs: map[string]JobInfo{"job1": {
242                         InfoJobIdentity: "job1",
243                 }},
244                 deleteJobCh: deleteCh,
245         }
246
247         go func() {
248                 deleteCh <- "job1"
249         }()
250
251         handlerUnderTest.monitorManagementChannels()
252
253         assertions.Len(handlerUnderTest.jobs, 0)
254 }
255
256 type RoundTripFunc func(req *http.Request) *http.Response
257
258 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
259         return f(req), nil
260 }
261
262 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
263 func NewTestClient(fn RoundTripFunc) *http.Client {
264         return &http.Client{
265                 Transport: RoundTripFunc(fn),
266         }
267 }
268
269 // waitTimeout waits for the waitgroup for the specified max timeout.
270 // Returns true if waiting timed out.
271 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
272         c := make(chan struct{})
273         go func() {
274                 defer close(c)
275                 wg.Wait()
276         }()
277         select {
278         case <-c:
279                 return false // completed normally
280         case <-time.After(timeout):
281                 return true // timed out
282         }
283 }
284
285 func getBodyAsString(req *http.Request) string {
286         buf := new(bytes.Buffer)
287         buf.ReadFrom(req.Body)
288         return buf.String()
289 }