2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
33 "github.com/stretchr/testify/require"
36 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
38 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
39 assertions := require.New(t)
40 typesDir, err := os.MkdirTemp("", "configs")
42 t.Errorf("Unable to create temporary directory for types due to: %v", err)
44 fname := filepath.Join(typesDir, "type_config.json")
45 handlerUnderTest := NewJobHandlerImpl(fname, nil, nil)
47 os.RemoveAll(typesDir)
48 handlerUnderTest.clearAll()
50 if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
51 t.Errorf("Unable to create temporary config file for types due to: %v", err)
53 types, err := handlerUnderTest.GetTypes()
54 wantedType := TypeDefinition{
56 DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
58 wantedTypes := []TypeDefinition{wantedType}
59 assertions.EqualValues(wantedTypes, types)
62 supportedTypes := handlerUnderTest.GetSupportedTypes()
63 assertions.EqualValues([]string{"type1"}, supportedTypes)
66 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
67 assertions := require.New(t)
68 handlerUnderTest := NewJobHandlerImpl("", nil, nil)
72 InfoJobIdentity: "job1",
75 InfoTypeIdentity: "type1",
77 handlerUnderTest.allTypes["type1"] = TypeData{
79 Jobs: map[string]JobInfo{"job1": wantedJob},
82 handlerUnderTest.clearAll()
85 err := handlerUnderTest.AddJob(wantedJob)
87 assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
88 assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"])
91 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
92 assertions := require.New(t)
93 handlerUnderTest := NewJobHandlerImpl("", nil, nil)
95 InfoTypeIdentity: "type1",
98 err := handlerUnderTest.AddJob(jobInfo)
99 assertions.NotNil(err)
100 assertions.Equal("type not supported: type1", err.Error())
103 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
104 assertions := require.New(t)
105 handlerUnderTest := NewJobHandlerImpl("", nil, nil)
106 handlerUnderTest.allTypes["type1"] = TypeData{
110 handlerUnderTest.clearAll()
114 InfoTypeIdentity: "type1",
116 err := handlerUnderTest.AddJob(jobInfo)
117 assertions.NotNil(err)
118 assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
121 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
122 assertions := require.New(t)
123 handlerUnderTest := NewJobHandlerImpl("", nil, nil)
124 handlerUnderTest.allTypes["type1"] = TypeData{
128 handlerUnderTest.clearAll()
132 InfoTypeIdentity: "type1",
133 InfoJobIdentity: "job1",
135 err := handlerUnderTest.AddJob(jobInfo)
136 assertions.NotNil(err)
137 assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
140 func TestDeleteJob(t *testing.T) {
141 assertions := require.New(t)
142 handlerUnderTest := NewJobHandlerImpl("", nil, nil)
143 jobToKeep := JobInfo{
144 InfoJobIdentity: "job1",
145 InfoTypeIdentity: "type1",
147 jobToDelete := JobInfo{
148 InfoJobIdentity: "job2",
149 InfoTypeIdentity: "type1",
151 handlerUnderTest.allTypes["type1"] = TypeData{
153 Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
156 handlerUnderTest.clearAll()
159 handlerUnderTest.DeleteJob("job2")
160 assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
161 assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"])
164 func TestPollAndDistributeMessages(t *testing.T) {
165 assertions := require.New(t)
167 wg := sync.WaitGroup{}
168 messages := `[{"message": {"data": "data"}}]`
169 pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
170 if req.URL.String() == "http://mrAddr/topicUrl" {
171 assertions.Equal(req.Method, "GET")
172 wg.Done() // Signal that the poll call has been made
173 return &http.Response{
175 Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))),
176 Header: make(http.Header), // Must be set to non-nil value or it panics
179 t.Error("Wrong call to client: ", req)
183 distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
184 if req.URL.String() == "http://consumerHost/target" {
185 assertions.Equal(req.Method, "POST")
186 assertions.Equal(messages, getBodyAsString(req))
187 assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
188 wg.Done() // Signal that the distribution call has been made
189 return &http.Response{
191 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
192 Header: make(http.Header), // Must be set to non-nil value or it panics
195 t.Error("Wrong call to client: ", req)
199 handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock)
201 InfoTypeIdentity: "type1",
202 InfoJobIdentity: "job1",
203 TargetUri: "http://consumerHost/target",
205 handlerUnderTest.allTypes["type1"] = TypeData{
207 DMaaPTopicURL: "topicUrl",
208 Jobs: map[string]JobInfo{"job1": jobInfo},
211 handlerUnderTest.clearAll()
214 wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
215 handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
217 if waitTimeout(&wg, 100*time.Millisecond) {
218 t.Error("Not all calls to server were made")
223 type RoundTripFunc func(req *http.Request) *http.Response
225 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
229 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
230 func NewTestClient(fn RoundTripFunc) *http.Client {
232 Transport: RoundTripFunc(fn),
236 // waitTimeout waits for the waitgroup for the specified max timeout.
237 // Returns true if waiting timed out.
238 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
239 c := make(chan struct{})
246 return false // completed normally
247 case <-time.After(timeout):
248 return true // timed out
252 func getBodyAsString(req *http.Request) string {
253 buf := new(bytes.Buffer)
254 buf.ReadFrom(req.Body)