Merge "Refactor Go code"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34 )
35
36 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
37
38 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
39         assertions := require.New(t)
40         typesDir, err := os.MkdirTemp("", "configs")
41         if err != nil {
42                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
43         }
44         fname := filepath.Join(typesDir, "type_config.json")
45         handlerUnderTest := NewJobHandlerImpl(fname, nil, nil)
46         t.Cleanup(func() {
47                 os.RemoveAll(typesDir)
48                 handlerUnderTest.clearAll()
49         })
50         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
51                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
52         }
53         types, err := handlerUnderTest.GetTypes()
54         wantedType := TypeDefinition{
55                 Id:            "type1",
56                 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
57         }
58         wantedTypes := []TypeDefinition{wantedType}
59         assertions.EqualValues(wantedTypes, types)
60         assertions.Nil(err)
61
62         supportedTypes := handlerUnderTest.GetSupportedTypes()
63         assertions.EqualValues([]string{"type1"}, supportedTypes)
64 }
65
66 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
67         assertions := require.New(t)
68         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
69         wantedJob := JobInfo{
70                 Owner:            "owner",
71                 LastUpdated:      "now",
72                 InfoJobIdentity:  "job1",
73                 TargetUri:        "target",
74                 InfoJobData:      "{}",
75                 InfoTypeIdentity: "type1",
76         }
77         handlerUnderTest.allTypes["type1"] = TypeData{
78                 TypeId: "type1",
79                 Jobs:   map[string]JobInfo{"job1": wantedJob},
80         }
81         t.Cleanup(func() {
82                 handlerUnderTest.clearAll()
83         })
84
85         err := handlerUnderTest.AddJob(wantedJob)
86         assertions.Nil(err)
87         assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
88         assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"])
89 }
90
91 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
92         assertions := require.New(t)
93         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
94         jobInfo := JobInfo{
95                 InfoTypeIdentity: "type1",
96         }
97
98         err := handlerUnderTest.AddJob(jobInfo)
99         assertions.NotNil(err)
100         assertions.Equal("type not supported: type1", err.Error())
101 }
102
103 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
104         assertions := require.New(t)
105         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
106         handlerUnderTest.allTypes["type1"] = TypeData{
107                 TypeId: "type1",
108         }
109         t.Cleanup(func() {
110                 handlerUnderTest.clearAll()
111         })
112
113         jobInfo := JobInfo{
114                 InfoTypeIdentity: "type1",
115         }
116         err := handlerUnderTest.AddJob(jobInfo)
117         assertions.NotNil(err)
118         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
119 }
120
121 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
122         assertions := require.New(t)
123         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
124         handlerUnderTest.allTypes["type1"] = TypeData{
125                 TypeId: "type1",
126         }
127         t.Cleanup(func() {
128                 handlerUnderTest.clearAll()
129         })
130
131         jobInfo := JobInfo{
132                 InfoTypeIdentity: "type1",
133                 InfoJobIdentity:  "job1",
134         }
135         err := handlerUnderTest.AddJob(jobInfo)
136         assertions.NotNil(err)
137         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
138 }
139
140 func TestDeleteJob(t *testing.T) {
141         assertions := require.New(t)
142         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
143         jobToKeep := JobInfo{
144                 InfoJobIdentity:  "job1",
145                 InfoTypeIdentity: "type1",
146         }
147         jobToDelete := JobInfo{
148                 InfoJobIdentity:  "job2",
149                 InfoTypeIdentity: "type1",
150         }
151         handlerUnderTest.allTypes["type1"] = TypeData{
152                 TypeId: "type1",
153                 Jobs:   map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
154         }
155         t.Cleanup(func() {
156                 handlerUnderTest.clearAll()
157         })
158
159         handlerUnderTest.DeleteJob("job2")
160         assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
161         assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"])
162 }
163
164 func TestPollAndDistributeMessages(t *testing.T) {
165         assertions := require.New(t)
166
167         wg := sync.WaitGroup{}
168         messages := `[{"message": {"data": "data"}}]`
169         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
170                 if req.URL.String() == "http://mrAddr/topicUrl" {
171                         assertions.Equal(req.Method, "GET")
172                         wg.Done() // Signal that the poll call has been made
173                         return &http.Response{
174                                 StatusCode: 200,
175                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
176                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
177                         }
178                 }
179                 t.Error("Wrong call to client: ", req)
180                 t.Fail()
181                 return nil
182         })
183         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
184                 if req.URL.String() == "http://consumerHost/target" {
185                         assertions.Equal(req.Method, "POST")
186                         assertions.Equal(messages, getBodyAsString(req))
187                         assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
188                         wg.Done() // Signal that the distribution call has been made
189                         return &http.Response{
190                                 StatusCode: 200,
191                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
192                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
193                         }
194                 }
195                 t.Error("Wrong call to client: ", req)
196                 t.Fail()
197                 return nil
198         })
199         handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock)
200         jobInfo := JobInfo{
201                 InfoTypeIdentity: "type1",
202                 InfoJobIdentity:  "job1",
203                 TargetUri:        "http://consumerHost/target",
204         }
205         handlerUnderTest.allTypes["type1"] = TypeData{
206                 TypeId:        "type1",
207                 DMaaPTopicURL: "topicUrl",
208                 Jobs:          map[string]JobInfo{"job1": jobInfo},
209         }
210         t.Cleanup(func() {
211                 handlerUnderTest.clearAll()
212         })
213
214         wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
215         handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
216
217         if waitTimeout(&wg, 100*time.Millisecond) {
218                 t.Error("Not all calls to server were made")
219                 t.Fail()
220         }
221 }
222
223 type RoundTripFunc func(req *http.Request) *http.Response
224
225 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
226         return f(req), nil
227 }
228
229 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
230 func NewTestClient(fn RoundTripFunc) *http.Client {
231         return &http.Client{
232                 Transport: RoundTripFunc(fn),
233         }
234 }
235
236 // waitTimeout waits for the waitgroup for the specified max timeout.
237 // Returns true if waiting timed out.
238 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
239         c := make(chan struct{})
240         go func() {
241                 defer close(c)
242                 wg.Wait()
243         }()
244         select {
245         case <-c:
246                 return false // completed normally
247         case <-time.After(timeout):
248                 return true // timed out
249         }
250 }
251
252 func getBodyAsString(req *http.Request) string {
253         buf := new(bytes.Buffer)
254         buf.ReadFrom(req.Body)
255         return buf.String()
256 }