Fix test of DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 )
36
37 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38
39 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
40         assertions := require.New(t)
41         typesDir, err := os.MkdirTemp("", "configs")
42         if err != nil {
43                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44         }
45         fname := filepath.Join(typesDir, "type_config.json")
46         handlerUnderTest := NewJobHandlerImpl(fname, nil, nil)
47         t.Cleanup(func() {
48                 os.RemoveAll(typesDir)
49                 handlerUnderTest.clearAll()
50         })
51         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
52                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
53         }
54         types, err := handlerUnderTest.GetTypes()
55         wantedType := config.TypeDefinition{
56                 Id:            "type1",
57                 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
58         }
59         wantedTypes := []config.TypeDefinition{wantedType}
60         assertions.EqualValues(wantedTypes, types)
61         assertions.Nil(err)
62
63         supportedTypes := handlerUnderTest.GetSupportedTypes()
64         assertions.EqualValues([]string{"type1"}, supportedTypes)
65 }
66
67 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
68         assertions := require.New(t)
69         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
70         wantedJob := JobInfo{
71                 Owner:            "owner",
72                 LastUpdated:      "now",
73                 InfoJobIdentity:  "job1",
74                 TargetUri:        "target",
75                 InfoJobData:      "{}",
76                 InfoTypeIdentity: "type1",
77         }
78         handlerUnderTest.allTypes["type1"] = TypeData{
79                 TypeId: "type1",
80                 Jobs:   map[string]JobInfo{"job1": wantedJob},
81         }
82         t.Cleanup(func() {
83                 handlerUnderTest.clearAll()
84         })
85
86         err := handlerUnderTest.AddJob(wantedJob)
87         assertions.Nil(err)
88         assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
89         assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"])
90 }
91
92 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
93         assertions := require.New(t)
94         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
95         jobInfo := JobInfo{
96                 InfoTypeIdentity: "type1",
97         }
98
99         err := handlerUnderTest.AddJob(jobInfo)
100         assertions.NotNil(err)
101         assertions.Equal("type not supported: type1", err.Error())
102 }
103
104 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
105         assertions := require.New(t)
106         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
107         handlerUnderTest.allTypes["type1"] = TypeData{
108                 TypeId: "type1",
109         }
110         t.Cleanup(func() {
111                 handlerUnderTest.clearAll()
112         })
113
114         jobInfo := JobInfo{
115                 InfoTypeIdentity: "type1",
116         }
117         err := handlerUnderTest.AddJob(jobInfo)
118         assertions.NotNil(err)
119         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
120 }
121
122 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
123         assertions := require.New(t)
124         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
125         handlerUnderTest.allTypes["type1"] = TypeData{
126                 TypeId: "type1",
127         }
128         t.Cleanup(func() {
129                 handlerUnderTest.clearAll()
130         })
131
132         jobInfo := JobInfo{
133                 InfoTypeIdentity: "type1",
134                 InfoJobIdentity:  "job1",
135         }
136         err := handlerUnderTest.AddJob(jobInfo)
137         assertions.NotNil(err)
138         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
139 }
140
141 func TestDeleteJob(t *testing.T) {
142         assertions := require.New(t)
143         handlerUnderTest := NewJobHandlerImpl("", nil, nil)
144         jobToKeep := JobInfo{
145                 InfoJobIdentity:  "job1",
146                 InfoTypeIdentity: "type1",
147         }
148         jobToDelete := JobInfo{
149                 InfoJobIdentity:  "job2",
150                 InfoTypeIdentity: "type1",
151         }
152         handlerUnderTest.allTypes["type1"] = TypeData{
153                 TypeId: "type1",
154                 Jobs:   map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
155         }
156         t.Cleanup(func() {
157                 handlerUnderTest.clearAll()
158         })
159
160         handlerUnderTest.DeleteJob("job2")
161         assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
162         assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"])
163 }
164
165 func TestPollAndDistributeMessages(t *testing.T) {
166         assertions := require.New(t)
167
168         wg := sync.WaitGroup{}
169         messages := `[{"message": {"data": "data"}}]`
170         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
171                 if req.URL.String() == "http://mrAddr/topicUrl" {
172                         assertions.Equal(req.Method, "GET")
173                         wg.Done() // Signal that the poll call has been made
174                         return &http.Response{
175                                 StatusCode: 200,
176                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
177                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
178                         }
179                 }
180                 t.Error("Wrong call to client: ", req)
181                 t.Fail()
182                 return nil
183         })
184         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
185                 if req.URL.String() == "http://consumerHost/target" {
186                         assertions.Equal(req.Method, "POST")
187                         assertions.Equal(messages, getBodyAsString(req))
188                         assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
189                         wg.Done() // Signal that the distribution call has been made
190                         return &http.Response{
191                                 StatusCode: 200,
192                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
193                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
194                         }
195                 }
196                 t.Error("Wrong call to client: ", req)
197                 t.Fail()
198                 return nil
199         })
200
201         handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock)
202
203         jobInfo := JobInfo{
204                 InfoTypeIdentity: "type1",
205                 InfoJobIdentity:  "job1",
206                 TargetUri:        "http://consumerHost/target",
207         }
208         handlerUnderTest.allTypes["type1"] = TypeData{
209                 TypeId:        "type1",
210                 DMaaPTopicURL: "topicUrl",
211                 Jobs:          map[string]JobInfo{"job1": jobInfo},
212         }
213         t.Cleanup(func() {
214                 handlerUnderTest.clearAll()
215         })
216
217         wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
218         handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
219
220         if waitTimeout(&wg, 100*time.Millisecond) {
221                 t.Error("Not all calls to server were made")
222                 t.Fail()
223         }
224 }
225
226 type RoundTripFunc func(req *http.Request) *http.Response
227
228 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
229         return f(req), nil
230 }
231
232 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
233 func NewTestClient(fn RoundTripFunc) *http.Client {
234         return &http.Client{
235                 Transport: RoundTripFunc(fn),
236         }
237 }
238
239 // waitTimeout waits for the waitgroup for the specified max timeout.
240 // Returns true if waiting timed out.
241 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
242         c := make(chan struct{})
243         go func() {
244                 defer close(c)
245                 wg.Wait()
246         }()
247         select {
248         case <-c:
249                 return false // completed normally
250         case <-time.After(timeout):
251                 return true // timed out
252         }
253 }
254
255 func getBodyAsString(req *http.Request) string {
256         buf := new(bytes.Buffer)
257         buf.ReadFrom(req.Body)
258         return buf.String()
259 }