2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.oransc.policyagent.dmaap;
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.anyString;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
42 import java.io.IOException;
43 import java.nio.charset.Charset;
44 import java.util.Optional;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.oransc.policyagent.clients.AsyncRestClient;
50 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
51 import org.oransc.policyagent.repository.ImmutablePolicyType;
52 import org.oransc.policyagent.repository.PolicyType;
53 import org.oransc.policyagent.utils.LoggingUtils;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.springframework.http.HttpHeaders;
57 import org.springframework.http.HttpStatus;
58 import org.springframework.http.ResponseEntity;
59 import org.springframework.web.reactive.function.client.WebClientResponseException;
61 import reactor.core.publisher.Mono;
62 import reactor.test.StepVerifier;
64 class DmaapMessageHandlerTest {
65 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
66 private static final String URL = "url";
68 private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
69 private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
70 private DmaapMessageHandler testedObject;
71 private static Gson gson = new GsonBuilder() //
75 private void setUp() throws Exception {
76 testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
79 static JsonObject payloadAsJson() {
80 return gson.fromJson(payloadAsString(), JsonObject.class);
83 static String payloadAsString() {
84 PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
85 return gson.toJson(pt);
88 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
89 Optional<JsonObject> payload =
90 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
92 return ImmutableDmaapRequestMessage.builder() //
93 .apiVersion("apiVersion") //
94 .correlationId("correlationId") //
95 .operation(operation) //
96 .originatorId("originatorId") //
98 .requestId("requestId") //
100 .timestamp("timestamp") //
105 private String dmaapInputMessage(Operation operation) {
106 return gson.toJson(dmaapRequestMessage(operation));
109 private Mono<ResponseEntity<String>> okResponse() {
110 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
111 return Mono.just(entity);
114 private Mono<ResponseEntity<String>> notOkResponse() {
115 ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
116 return Mono.just(entity);
120 void testMessageParsing() {
121 String message = dmaapInputMessage(Operation.DELETE);
122 logger.info(message);
123 DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
124 assertNotNull(parsedMessage);
125 assertFalse(parsedMessage.payload().isPresent());
127 message = dmaapInputMessage(Operation.PUT);
128 logger.info(message);
129 parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
130 assertNotNull(parsedMessage);
131 assertTrue(parsedMessage.payload().isPresent());
135 void unparseableMessage_thenWarning() {
136 final ListAppender<ILoggingEvent> logAppender =
137 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
139 String msg = "bad message";
140 testedObject.handleDmaapMsg(msg);
142 assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
143 "handleDmaapMsg failure org.oransc.policyagent.exceptions.ServiceException: Received unparsable "
144 + "message from DMAAP: \"" + msg + "\", reason: ");
148 void successfulDelete() throws IOException {
149 doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
150 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
152 String message = dmaapInputMessage(Operation.DELETE);
155 .create(testedObject.createTask(message)) //
156 .expectSubscription() //
158 .verifyComplete(); //
160 verify(agentClient).deleteForEntity(URL);
161 verifyNoMoreInteractions(agentClient);
163 verify(dmaapClient).post(anyString(), anyString());
165 verifyNoMoreInteractions(dmaapClient);
169 void successfulGet() throws IOException {
170 doReturn(okResponse()).when(agentClient).getForEntity(anyString());
171 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
174 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
175 .expectSubscription() //
177 .verifyComplete(); //
179 verify(agentClient).getForEntity(URL);
180 verifyNoMoreInteractions(agentClient);
182 verify(dmaapClient).post(anyString(), anyString());
183 verifyNoMoreInteractions(dmaapClient);
187 void exceptionFromAgentWhenGet_thenPostError() throws IOException {
188 String errorBody = "Unavailable";
189 WebClientResponseException webClientResponseException = new WebClientResponseException(
190 HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
191 doReturn(Mono.error(webClientResponseException)).when(agentClient).getForEntity(anyString());
192 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
195 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
196 .expectSubscription() //
197 .verifyComplete(); //
199 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
200 verify(dmaapClient).post(anyString(), captor.capture());
201 String actualMessage = captor.getValue();
202 assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
203 .contains(errorBody);
207 void successfulPut() throws IOException {
208 doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
209 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
212 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
213 .expectSubscription() //
215 .verifyComplete(); //
217 verify(agentClient).putForEntity(URL, payloadAsString());
218 verifyNoMoreInteractions(agentClient);
220 verify(dmaapClient).post(anyString(), anyString());
221 verifyNoMoreInteractions(dmaapClient);
225 void successfulPost() throws IOException {
226 doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
227 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
230 .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
231 .expectSubscription() //
233 .verifyComplete(); //
235 verify(agentClient).postForEntity(URL, payloadAsString());
236 verifyNoMoreInteractions(agentClient);
238 verify(dmaapClient).post(anyString(), anyString());
239 verifyNoMoreInteractions(dmaapClient);
243 void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
245 doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString());
246 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
248 testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
250 verify(agentClient).putForEntity(anyString(), anyString());
251 verifyNoMoreInteractions(agentClient);
253 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
254 verify(dmaapClient).post(anyString(), captor.capture());
255 String actualMessage = captor.getValue();
256 assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
257 .contains(HttpStatus.BAD_GATEWAY.toString());
259 verifyNoMoreInteractions(dmaapClient);
263 void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
264 String message = dmaapInputMessage(Operation.PUT).toString();
265 String badOperation = "BAD";
266 message = message.replace(Operation.PUT.toString(), badOperation);
268 testedObject.handleDmaapMsg(message);
270 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
271 verify(dmaapClient).post(anyString(), captor.capture());
272 String actualMessage = captor.getValue();
273 assertThat(actualMessage).contains("Not implemented operation") //
274 .contains("BAD_REQUEST");
278 void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
279 String message = dmaapInputMessage(Operation.PUT).toString();
280 message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
282 final ListAppender<ILoggingEvent> logAppender =
283 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
285 testedObject.handleDmaapMsg(message);
287 assertThat(logAppender.list.get(0).getFormattedMessage())
288 .startsWith("Expected payload in message from DMAAP: ");