2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
31 "github.com/stretchr/testify/require"
32 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
37 func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
38 assertions := require.New(t)
40 managerUnderTest := NewJobsManagerImpl(nil, "", nil)
42 wantedType := config.TypeDefinition{
44 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
46 wantedTypes := []config.TypeDefinition{wantedType}
48 types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
50 assertions.EqualValues(wantedTypes, types)
52 supportedTypes := managerUnderTest.GetSupportedTypes()
53 assertions.EqualValues([]string{"type1"}, supportedTypes)
56 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
57 assertions := require.New(t)
58 managerUnderTest := NewJobsManagerImpl(nil, "", nil)
62 InfoJobIdentity: "job1",
65 InfoTypeIdentity: "type1",
67 jobsHandler := jobsHandler{
68 addJobCh: make(chan JobInfo)}
69 managerUnderTest.allTypes["type1"] = TypeData{
71 jobsHandler: &jobsHandler,
76 err = managerUnderTest.AddJobFromRESTCall(wantedJob)
80 addedJob := <-jobsHandler.addJobCh
81 assertions.Equal(wantedJob, addedJob)
84 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
85 assertions := require.New(t)
86 managerUnderTest := NewJobsManagerImpl(nil, "", nil)
88 InfoTypeIdentity: "type1",
91 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
92 assertions.NotNil(err)
93 assertions.Equal("type not supported: type1", err.Error())
96 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
97 assertions := require.New(t)
98 managerUnderTest := NewJobsManagerImpl(nil, "", nil)
99 managerUnderTest.allTypes["type1"] = TypeData{
104 InfoTypeIdentity: "type1",
106 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
107 assertions.NotNil(err)
108 assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
111 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
112 assertions := require.New(t)
113 managerUnderTest := NewJobsManagerImpl(nil, "", nil)
114 managerUnderTest.allTypes["type1"] = TypeData{
119 InfoTypeIdentity: "type1",
120 InfoJobIdentity: "job1",
122 err := managerUnderTest.AddJobFromRESTCall(jobInfo)
123 assertions.NotNil(err)
124 assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
127 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
128 assertions := require.New(t)
129 managerUnderTest := NewJobsManagerImpl(nil, "", nil)
130 jobsHandler := jobsHandler{
131 deleteJobCh: make(chan string)}
132 managerUnderTest.allTypes["type1"] = TypeData{
134 jobsHandler: &jobsHandler,
137 go managerUnderTest.DeleteJobFromRESTCall("job2")
139 assertions.Equal("job2", <-jobsHandler.deleteJobCh)
142 func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
143 assertions := require.New(t)
146 messages := `[{"message": {"data": "data"}}]`
147 pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
148 if req.URL.String() == "http://mrAddr/topicUrl" {
149 assertions.Equal(req.Method, "GET")
155 return &http.Response{
157 Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
158 Header: make(http.Header), // Must be set to non-nil value or it panics
161 t.Error("Wrong call to client: ", req)
166 wg := sync.WaitGroup{}
167 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
168 if req.URL.String() == "http://consumerHost/target" {
169 assertions.Equal(req.Method, "POST")
170 assertions.Equal(messages, getBodyAsString(req, t))
171 assertions.Equal("application/json", req.Header.Get("Content-Type"))
173 return &http.Response{
175 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
176 Header: make(http.Header), // Must be set to non-nil value or it panics
179 t.Error("Wrong call to client: ", req)
183 jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
185 jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
186 jobsManager.allTypes["type1"] = TypeData{
187 DMaaPTopicURL: "/topicUrl",
189 jobsHandler: jobsHandler,
192 jobsManager.StartJobsForAllTypes()
195 InfoTypeIdentity: "type1",
196 InfoJobIdentity: "job1",
197 TargetUri: "http://consumerHost/target",
200 wg.Add(1) // Wait till the distribution has happened
201 err := jobsManager.AddJobFromRESTCall(jobInfo)
204 if waitTimeout(&wg, 2*time.Second) {
205 t.Error("Not all calls to server were made")
210 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
211 jobToDelete := newJob(JobInfo{}, nil)
212 go jobToDelete.start()
213 jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
214 jobsHandler.jobs["job1"] = jobToDelete
216 go jobsHandler.monitorManagementChannels()
218 jobsHandler.deleteJobCh <- "job1"
221 for i := 0; i < 100; i++ {
222 if len(jobsHandler.jobs) == 0 {
226 time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
228 require.New(t).True(deleted, "Job not deleted")
231 func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
232 job := newJob(JobInfo{
233 InfoJobIdentity: "job",
236 jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
237 jobsHandler.jobs["job1"] = job
239 fillMessagesBuffer(job.messagesChannel)
241 jobsHandler.distributeMessages([]byte("sent msg"))
243 require.New(t).Len(job.messagesChannel, 0)
246 func fillMessagesBuffer(mc chan []byte) {
247 for i := 0; i < cap(mc); i++ {
252 type RoundTripFunc func(req *http.Request) *http.Response
254 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
258 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
259 func NewTestClient(fn RoundTripFunc) *http.Client {
261 Transport: RoundTripFunc(fn),
265 // waitTimeout waits for the waitgroup for the specified max timeout.
266 // Returns true if waiting timed out.
267 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
268 c := make(chan struct{})
275 return false // completed normally
276 case <-time.After(timeout):
277 return true // timed out
281 func getBodyAsString(req *http.Request, t *testing.T) string {
282 buf := new(bytes.Buffer)
283 if _, err := buf.ReadFrom(req.Body); err != nil {