Improve concurrency of message sending
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os"
28         "path/filepath"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 )
36
37 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38
39 func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
40         assertions := require.New(t)
41         typesDir, err := os.MkdirTemp("", "configs")
42         if err != nil {
43                 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44         }
45         fname := filepath.Join(typesDir, "type_config.json")
46         managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
47         t.Cleanup(func() {
48                 os.RemoveAll(typesDir)
49         })
50         if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
51                 t.Errorf("Unable to create temporary config file for types due to: %v", err)
52         }
53         types, err := managerUnderTest.LoadTypesFromConfiguration()
54         wantedType := config.TypeDefinition{
55                 Id:            "type1",
56                 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
57         }
58         wantedTypes := []config.TypeDefinition{wantedType}
59         assertions.EqualValues(wantedTypes, types)
60         assertions.Nil(err)
61
62         supportedTypes := managerUnderTest.GetSupportedTypes()
63         assertions.EqualValues([]string{"type1"}, supportedTypes)
64 }
65
66 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
67         assertions := require.New(t)
68         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
69         wantedJob := JobInfo{
70                 Owner:            "owner",
71                 LastUpdated:      "now",
72                 InfoJobIdentity:  "job1",
73                 TargetUri:        "target",
74                 InfoJobData:      "{}",
75                 InfoTypeIdentity: "type1",
76         }
77         jobsHandler := jobsHandler{
78                 addJobCh: make(chan JobInfo)}
79         managerUnderTest.allTypes["type1"] = TypeData{
80                 TypeId:      "type1",
81                 jobsHandler: &jobsHandler,
82         }
83
84         var err error
85         go func() {
86                 err = managerUnderTest.AddJob(wantedJob)
87         }()
88
89         assertions.Nil(err)
90         addedJob := <-jobsHandler.addJobCh
91         assertions.Equal(wantedJob, addedJob)
92 }
93
94 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
95         assertions := require.New(t)
96         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
97         jobInfo := JobInfo{
98                 InfoTypeIdentity: "type1",
99         }
100
101         err := managerUnderTest.AddJob(jobInfo)
102         assertions.NotNil(err)
103         assertions.Equal("type not supported: type1", err.Error())
104 }
105
106 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
107         assertions := require.New(t)
108         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
109         managerUnderTest.allTypes["type1"] = TypeData{
110                 TypeId: "type1",
111         }
112
113         jobInfo := JobInfo{
114                 InfoTypeIdentity: "type1",
115         }
116         err := managerUnderTest.AddJob(jobInfo)
117         assertions.NotNil(err)
118         assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
119 }
120
121 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
122         assertions := require.New(t)
123         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
124         managerUnderTest.allTypes["type1"] = TypeData{
125                 TypeId: "type1",
126         }
127
128         jobInfo := JobInfo{
129                 InfoTypeIdentity: "type1",
130                 InfoJobIdentity:  "job1",
131         }
132         err := managerUnderTest.AddJob(jobInfo)
133         assertions.NotNil(err)
134         assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
135 }
136
137 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
138         assertions := require.New(t)
139         managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
140         jobsHandler := jobsHandler{
141                 deleteJobCh: make(chan string)}
142         managerUnderTest.allTypes["type1"] = TypeData{
143                 TypeId:      "type1",
144                 jobsHandler: &jobsHandler,
145         }
146
147         go managerUnderTest.DeleteJob("job2")
148
149         assertions.Equal("job2", <-jobsHandler.deleteJobCh)
150 }
151
152 func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
153         assertions := require.New(t)
154
155         called := false
156         messages := `[{"message": {"data": "data"}}]`
157         pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
158                 if req.URL.String() == "http://mrAddr/topicUrl" {
159                         assertions.Equal(req.Method, "GET")
160                         body := "[]"
161                         if !called {
162                                 called = true
163                                 body = messages
164                         }
165                         return &http.Response{
166                                 StatusCode: 200,
167                                 Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
168                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
169                         }
170                 }
171                 t.Error("Wrong call to client: ", req)
172                 t.Fail()
173                 return nil
174         })
175
176         wg := sync.WaitGroup{}
177         distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
178                 if req.URL.String() == "http://consumerHost/target" {
179                         assertions.Equal(req.Method, "POST")
180                         assertions.Equal(messages, getBodyAsString(req))
181                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
182                         wg.Done()
183                         return &http.Response{
184                                 StatusCode: 200,
185                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
186                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
187                         }
188                 }
189                 t.Error("Wrong call to client: ", req)
190                 t.Fail()
191                 return nil
192         })
193         jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
194
195         jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
196         jobsManager.allTypes["type1"] = TypeData{
197                 DMaaPTopicURL: "/topicUrl",
198                 TypeId:        "type1",
199                 jobsHandler:   jobsHandler,
200         }
201
202         jobsManager.StartJobs()
203
204         jobInfo := JobInfo{
205                 InfoTypeIdentity: "type1",
206                 InfoJobIdentity:  "job1",
207                 TargetUri:        "http://consumerHost/target",
208         }
209
210         wg.Add(1) // Wait till the distribution has happened
211         jobsManager.AddJob(jobInfo)
212
213         if waitTimeout(&wg, 2*time.Second) {
214                 t.Error("Not all calls to server were made")
215                 t.Fail()
216         }
217 }
218
219 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
220         jobToDelete := newJob(JobInfo{}, nil)
221         go jobToDelete.start()
222         jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
223         jobsHandler.jobs["job1"] = jobToDelete
224
225         go jobsHandler.monitorManagementChannels()
226
227         jobsHandler.deleteJobCh <- "job1"
228
229         deleted := false
230         for i := 0; i < 100; i++ {
231                 if len(jobsHandler.jobs) == 0 {
232                         deleted = true
233                         break
234                 }
235                 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
236         }
237         require.New(t).True(deleted, "Job not deleted")
238 }
239
240 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
241         job := newJob(JobInfo{
242                 InfoJobIdentity: "job",
243         }, nil)
244
245         jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
246         jobsHandler.jobs["job1"] = job
247
248         fillMessagesBuffer(job.messagesChannel)
249
250         jobsHandler.distributeMessages([]byte("sent msg"))
251
252         require.New(t).Len(job.messagesChannel, 0)
253 }
254
255 func fillMessagesBuffer(mc chan []byte) {
256         for i := 0; i < cap(mc); i++ {
257                 mc <- []byte("msg")
258         }
259 }
260
261 type RoundTripFunc func(req *http.Request) *http.Response
262
263 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
264         return f(req), nil
265 }
266
267 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
268 func NewTestClient(fn RoundTripFunc) *http.Client {
269         return &http.Client{
270                 Transport: RoundTripFunc(fn),
271         }
272 }
273
274 // waitTimeout waits for the waitgroup for the specified max timeout.
275 // Returns true if waiting timed out.
276 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
277         c := make(chan struct{})
278         go func() {
279                 defer close(c)
280                 wg.Wait()
281         }()
282         select {
283         case <-c:
284                 return false // completed normally
285         case <-time.After(timeout):
286                 return true // timed out
287         }
288 }
289
290 func getBodyAsString(req *http.Request) string {
291         buf := new(bytes.Buffer)
292         buf.ReadFrom(req.Body)
293         return buf.String()
294 }