f0efddcfaba646255eec3779bee64e26bdbcc655
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageHandlerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.anyString;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
37
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
41
42 import java.io.IOException;
43 import java.nio.charset.Charset;
44 import java.util.Optional;
45
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.oransc.policyagent.clients.AsyncRestClient;
50 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
51 import org.oransc.policyagent.repository.ImmutablePolicyType;
52 import org.oransc.policyagent.repository.PolicyType;
53 import org.oransc.policyagent.utils.LoggingUtils;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.springframework.http.HttpHeaders;
57 import org.springframework.http.HttpStatus;
58 import org.springframework.http.ResponseEntity;
59 import org.springframework.web.reactive.function.client.WebClientResponseException;
60
61 import reactor.core.publisher.Mono;
62 import reactor.test.StepVerifier;
63
64 class DmaapMessageHandlerTest {
65     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
66     private static final String URL = "url";
67
68     private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
69     private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
70     private DmaapMessageHandler testedObject;
71     private static Gson gson = new GsonBuilder() //
72         .create(); //
73
74     @BeforeEach
75     private void setUp() throws Exception {
76         testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
77     }
78
79     static JsonObject payloadAsJson() {
80         return gson.fromJson(payloadAsString(), JsonObject.class);
81     }
82
83     static String payloadAsString() {
84         PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
85         return gson.toJson(pt);
86     }
87
88     DmaapRequestMessage dmaapRequestMessage(Operation operation) {
89         Optional<JsonObject> payload =
90             ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
91                 : Optional.empty());
92         return ImmutableDmaapRequestMessage.builder() //
93             .apiVersion("apiVersion") //
94             .correlationId("correlationId") //
95             .operation(operation) //
96             .originatorId("originatorId") //
97             .payload(payload) //
98             .requestId("requestId") //
99             .target("target") //
100             .timestamp("timestamp") //
101             .url(URL) //
102             .build();
103     }
104
105     private String dmaapInputMessage(Operation operation) {
106         return gson.toJson(dmaapRequestMessage(operation));
107     }
108
109     private Mono<ResponseEntity<String>> okResponse() {
110         ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
111         return Mono.just(entity);
112     }
113
114     private Mono<ResponseEntity<String>> notOkResponse() {
115         ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
116         return Mono.just(entity);
117     }
118
119     @Test
120     void testMessageParsing() {
121         String message = dmaapInputMessage(Operation.DELETE);
122         logger.info(message);
123         DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
124         assertNotNull(parsedMessage);
125         assertFalse(parsedMessage.payload().isPresent());
126
127         message = dmaapInputMessage(Operation.PUT);
128         logger.info(message);
129         parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
130         assertNotNull(parsedMessage);
131         assertTrue(parsedMessage.payload().isPresent());
132     }
133
134     @Test
135     void unparseableMessage_thenWarning() {
136         final ListAppender<ILoggingEvent> logAppender =
137             LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
138
139         String msg = "bad message";
140         testedObject.handleDmaapMsg(msg);
141
142         assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
143             "handleDmaapMsg failure org.oransc.policyagent.exceptions.ServiceException: Received unparsable "
144                 + "message from DMAAP: \"" + msg + "\", reason: ");
145     }
146
147     @Test
148     void successfulDelete() throws IOException {
149         doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
150         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
151
152         String message = dmaapInputMessage(Operation.DELETE);
153
154         StepVerifier //
155             .create(testedObject.createTask(message)) //
156             .expectSubscription() //
157             .expectNext("OK") //
158             .verifyComplete(); //
159
160         verify(agentClient).deleteForEntity(URL);
161         verifyNoMoreInteractions(agentClient);
162
163         verify(dmaapClient).post(anyString(), anyString());
164
165         verifyNoMoreInteractions(dmaapClient);
166     }
167
168     @Test
169     void successfulGet() throws IOException {
170         doReturn(okResponse()).when(agentClient).getForEntity(anyString());
171         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
172
173         StepVerifier //
174             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
175             .expectSubscription() //
176             .expectNext("OK") //
177             .verifyComplete(); //
178
179         verify(agentClient).getForEntity(URL);
180         verifyNoMoreInteractions(agentClient);
181
182         verify(dmaapClient).post(anyString(), anyString());
183         verifyNoMoreInteractions(dmaapClient);
184     }
185
186     @Test
187     void exceptionFromAgentWhenGet_thenPostError() throws IOException {
188         String errorBody = "Unavailable";
189         WebClientResponseException webClientResponseException = new WebClientResponseException(
190             HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
191         doReturn(Mono.error(webClientResponseException)).when(agentClient).getForEntity(anyString());
192         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
193
194         StepVerifier //
195             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
196             .expectSubscription() //
197             .verifyComplete(); //
198
199         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
200         verify(dmaapClient).post(anyString(), captor.capture());
201         String actualMessage = captor.getValue();
202         assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
203             .contains(errorBody);
204     }
205
206     @Test
207     void successfulPut() throws IOException {
208         doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
209         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
210
211         StepVerifier //
212             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
213             .expectSubscription() //
214             .expectNext("OK") //
215             .verifyComplete(); //
216
217         verify(agentClient).putForEntity(URL, payloadAsString());
218         verifyNoMoreInteractions(agentClient);
219
220         verify(dmaapClient).post(anyString(), anyString());
221         verifyNoMoreInteractions(dmaapClient);
222     }
223
224     @Test
225     void successfulPost() throws IOException {
226         doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
227         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
228
229         StepVerifier //
230             .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
231             .expectSubscription() //
232             .expectNext("OK") //
233             .verifyComplete(); //
234
235         verify(agentClient).postForEntity(URL, payloadAsString());
236         verifyNoMoreInteractions(agentClient);
237
238         verify(dmaapClient).post(anyString(), anyString());
239         verifyNoMoreInteractions(dmaapClient);
240     }
241
242     @Test
243     void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
244
245         doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString());
246         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
247
248         testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
249
250         verify(agentClient).putForEntity(anyString(), anyString());
251         verifyNoMoreInteractions(agentClient);
252
253         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
254         verify(dmaapClient).post(anyString(), captor.capture());
255         String actualMessage = captor.getValue();
256         assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
257             .contains(HttpStatus.BAD_GATEWAY.toString());
258
259         verifyNoMoreInteractions(dmaapClient);
260     }
261
262     @Test
263     void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
264         String message = dmaapInputMessage(Operation.PUT).toString();
265         String badOperation = "BAD";
266         message = message.replace(Operation.PUT.toString(), badOperation);
267
268         testedObject.handleDmaapMsg(message);
269
270         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
271         verify(dmaapClient).post(anyString(), captor.capture());
272         String actualMessage = captor.getValue();
273         assertThat(actualMessage).contains("Not implemented operation") //
274             .contains("BAD_REQUEST");
275     }
276
277     @Test
278     void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
279         String message = dmaapInputMessage(Operation.PUT).toString();
280         message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
281
282         final ListAppender<ILoggingEvent> logAppender =
283             LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
284
285         testedObject.handleDmaapMsg(message);
286
287         assertThat(logAppender.list.get(0).getFormattedMessage())
288             .startsWith("Expected payload in message from DMAAP: ");
289     }
290 }