Merge "Test env updates"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
35 )
36
37 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38
39 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
40         assertions := require.New(t)
41         typesDir, err := os.MkdirTemp("", "configs")
42         if err != nil {
43                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44         }
45         t.Cleanup(func() {
46                 os.RemoveAll(typesDir)
47                 clearAll()
48         })
49         fname := filepath.Join(typesDir, "type_config.json")
50         configFile = fname
51         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
52                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
53         }
54         types, err := GetTypes()
55         wantedType := TypeData{
56                 TypeId:        "type1",
57                 DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
58                 Jobs:          make(map[string]JobInfo),
59         }
60         wantedTypes := []TypeData{wantedType}
61         assertions.EqualValues(wantedTypes, types)
62         assertions.Nil(err)
63
64         supportedTypes := GetSupportedTypes()
65         assertions.EqualValues([]string{"type1"}, supportedTypes)
66 }
67
68 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
69         assertions := require.New(t)
70         wantedJob := JobInfo{
71                 Owner:            "owner",
72                 LastUpdated:      "now",
73                 InfoJobIdentity:  "job1",
74                 TargetUri:        "target",
75                 InfoJobData:      "{}",
76                 InfoTypeIdentity: "type1",
77         }
78         allTypes["type1"] = TypeData{
79                 TypeId: "type1",
80                 Jobs:   map[string]JobInfo{"job1": wantedJob},
81         }
82         t.Cleanup(func() {
83                 clearAll()
84         })
85
86         err := AddJob(wantedJob)
87         assertions.Nil(err)
88         assertions.Equal(1, len(allTypes["type1"].Jobs))
89         assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"])
90 }
91
92 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
93         assertions := require.New(t)
94         jobInfo := JobInfo{
95                 InfoTypeIdentity: "type1",
96         }
97
98         err := AddJob(jobInfo)
99         assertions.NotNil(err)
100         assertions.Equal("type not supported: type1", err.Error())
101 }
102
103 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
104         assertions := require.New(t)
105         allTypes["type1"] = TypeData{
106                 TypeId: "type1",
107         }
108         t.Cleanup(func() {
109                 clearAll()
110         })
111         jobInfo := JobInfo{
112                 InfoTypeIdentity: "type1",
113         }
114
115         err := AddJob(jobInfo)
116         assertions.NotNil(err)
117         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
118 }
119
120 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
121         assertions := require.New(t)
122         allTypes["type1"] = TypeData{
123                 TypeId: "type1",
124         }
125         jobInfo := JobInfo{
126                 InfoTypeIdentity: "type1",
127                 InfoJobIdentity:  "job1",
128         }
129
130         err := AddJob(jobInfo)
131         assertions.NotNil(err)
132         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
133         clearAll()
134 }
135
136 func TestDeleteJob(t *testing.T) {
137         assertions := require.New(t)
138         jobToKeep := JobInfo{
139                 InfoJobIdentity:  "job1",
140                 InfoTypeIdentity: "type1",
141         }
142         jobToDelete := JobInfo{
143                 InfoJobIdentity:  "job2",
144                 InfoTypeIdentity: "type1",
145         }
146         allTypes["type1"] = TypeData{
147                 TypeId: "type1",
148                 Jobs:   map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
149         }
150         t.Cleanup(func() {
151                 clearAll()
152         })
153
154         DeleteJob("job2")
155         assertions.Equal(1, len(allTypes["type1"].Jobs))
156         assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"])
157 }
158
159 func TestPollAndDistributeMessages(t *testing.T) {
160         assertions := require.New(t)
161         jobInfo := JobInfo{
162                 InfoTypeIdentity: "type1",
163                 InfoJobIdentity:  "job1",
164                 TargetUri:        "http://consumerHost/target",
165         }
166         allTypes["type1"] = TypeData{
167                 TypeId:        "type1",
168                 DMaaPTopicURL: "topicUrl",
169                 Jobs:          map[string]JobInfo{"job1": jobInfo},
170         }
171         t.Cleanup(func() {
172                 clearAll()
173         })
174
175         wg := sync.WaitGroup{}
176         wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
177         messages := `[{"message": {"data": "data"}}]`
178         clientMock := NewTestClient(func(req *http.Request) *http.Response {
179                 if req.URL.String() == "http://mrAddr/topicUrl" {
180                         assertions.Equal(req.Method, "GET")
181                         wg.Done()
182                         return &http.Response{
183                                 StatusCode: 200,
184                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
185                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
186                         }
187                 } else if req.URL.String() == "http://consumerHost/target" {
188                         assertions.Equal(req.Method, "POST")
189                         assertions.Equal(messages, getBodyAsString(req))
190                         assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
191                         wg.Done()
192                         return &http.Response{
193                                 StatusCode: 200,
194                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
195                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
196                         }
197                 }
198                 t.Error("Wrong call to client: ", req)
199                 t.Fail()
200                 return nil
201         })
202
203         restclient.Client = clientMock
204
205         pollAndDistributeMessages("http://mrAddr")
206
207         if waitTimeout(&wg, 100*time.Millisecond) {
208                 t.Error("Not all calls to server were made")
209                 t.Fail()
210         }
211 }
212
213 type RoundTripFunc func(req *http.Request) *http.Response
214
215 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
216         return f(req), nil
217 }
218
219 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
220 func NewTestClient(fn RoundTripFunc) *http.Client {
221         return &http.Client{
222                 Transport: RoundTripFunc(fn),
223         }
224 }
225
226 // waitTimeout waits for the waitgroup for the specified max timeout.
227 // Returns true if waiting timed out.
228 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
229         c := make(chan struct{})
230         go func() {
231                 defer close(c)
232                 wg.Wait()
233         }()
234         select {
235         case <-c:
236                 return false // completed normally
237         case <-time.After(timeout):
238                 return true // timed out
239         }
240 }
241
242 func getBodyAsString(req *http.Request) string {
243         buf := new(bytes.Buffer)
244         buf.ReadFrom(req.Body)
245         return buf.String()
246 }