Merge "NONRTRIC - Implement DMaaP mediator producer service in Java"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
35 )
36
37 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38
39 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
40         assertions := require.New(t)
41         typesDir, err := os.MkdirTemp("", "configs")
42         if err != nil {
43                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44         }
45         t.Cleanup(func() {
46                 os.RemoveAll(typesDir)
47                 clearAll()
48         })
49         fname := filepath.Join(typesDir, "type_config.json")
50         configFile = fname
51         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
52                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
53         }
54         types, err := GetTypes()
55         wantedType := TypeData{
56                 TypeId:        "type1",
57                 DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
58                 Jobs:          make(map[string]JobInfo),
59         }
60         wantedTypes := []TypeData{wantedType}
61         assertions.EqualValues(wantedTypes, types)
62         assertions.Nil(err)
63
64         supportedTypes := GetSupportedTypes()
65         assertions.EqualValues([]string{"type1"}, supportedTypes)
66 }
67
68 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
69         assertions := require.New(t)
70         wantedJob := JobInfo{
71                 Owner:            "owner",
72                 LastUpdated:      "now",
73                 InfoJobIdentity:  "job1",
74                 TargetUri:        "target",
75                 InfoJobData:      "{}",
76                 InfoTypeIdentity: "type1",
77         }
78         allTypes["type1"] = TypeData{
79                 TypeId: "type1",
80                 Jobs:   map[string]JobInfo{"job1": wantedJob},
81         }
82         t.Cleanup(func() {
83                 clearAll()
84         })
85
86         err := AddJob(wantedJob)
87         assertions.Nil(err)
88         assertions.Equal(1, len(allTypes["type1"].Jobs))
89         assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"])
90 }
91
92 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
93         assertions := require.New(t)
94         jobInfo := JobInfo{
95                 InfoTypeIdentity: "type1",
96         }
97
98         err := AddJob(jobInfo)
99         assertions.NotNil(err)
100         assertions.Equal("type not supported: type1", err.Error())
101 }
102
103 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
104         assertions := require.New(t)
105         allTypes["type1"] = TypeData{
106                 TypeId: "type1",
107         }
108         t.Cleanup(func() {
109                 clearAll()
110         })
111         jobInfo := JobInfo{
112                 InfoTypeIdentity: "type1",
113         }
114
115         err := AddJob(jobInfo)
116         assertions.NotNil(err)
117         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
118 }
119
120 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
121         assertions := require.New(t)
122         allTypes["type1"] = TypeData{
123                 TypeId: "type1",
124         }
125         jobInfo := JobInfo{
126                 InfoTypeIdentity: "type1",
127                 InfoJobIdentity:  "job1",
128         }
129
130         err := AddJob(jobInfo)
131         assertions.NotNil(err)
132         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
133         clearAll()
134 }
135 func TestPollAndDistributeMessages(t *testing.T) {
136         assertions := require.New(t)
137         jobInfo := JobInfo{
138                 InfoTypeIdentity: "type1",
139                 InfoJobIdentity:  "job1",
140                 TargetUri:        "http://consumerHost/target",
141         }
142         allTypes["type1"] = TypeData{
143                 TypeId:        "type1",
144                 DMaaPTopicURL: "topicUrl",
145                 Jobs:          map[string]JobInfo{"job1": jobInfo},
146         }
147         t.Cleanup(func() {
148                 clearAll()
149         })
150
151         wg := sync.WaitGroup{}
152         wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
153         messages := `[{"message": {"data": "data"}}]`
154         clientMock := NewTestClient(func(req *http.Request) *http.Response {
155                 if req.URL.String() == "http://mrAddr/topicUrl" {
156                         assertions.Equal(req.Method, "GET")
157                         wg.Done()
158                         return &http.Response{
159                                 StatusCode: 200,
160                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
161                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
162                         }
163                 } else if req.URL.String() == "http://consumerHost/target" {
164                         assertions.Equal(req.Method, "POST")
165                         assertions.Equal(messages, getBodyAsString(req))
166                         assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
167                         wg.Done()
168                         return &http.Response{
169                                 StatusCode: 200,
170                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
171                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
172                         }
173                 }
174                 t.Error("Wrong call to client: ", req)
175                 t.Fail()
176                 return nil
177         })
178
179         restclient.Client = clientMock
180
181         pollAndDistributeMessages("http://mrAddr")
182
183         if waitTimeout(&wg, 100*time.Millisecond) {
184                 t.Error("Not all calls to server were made")
185                 t.Fail()
186         }
187 }
188
189 type RoundTripFunc func(req *http.Request) *http.Response
190
191 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
192         return f(req), nil
193 }
194
195 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
196 func NewTestClient(fn RoundTripFunc) *http.Client {
197         return &http.Client{
198                 Transport: RoundTripFunc(fn),
199         }
200 }
201
202 // waitTimeout waits for the waitgroup for the specified max timeout.
203 // Returns true if waiting timed out.
204 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
205         c := make(chan struct{})
206         go func() {
207                 defer close(c)
208                 wg.Wait()
209         }()
210         select {
211         case <-c:
212                 return false // completed normally
213         case <-time.After(timeout):
214                 return true // timed out
215         }
216 }
217
218 func getBodyAsString(req *http.Request) string {
219         buf := new(bytes.Buffer)
220         buf.ReadFrom(req.Body)
221         return buf.String()
222 }