2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.oransc.policyagent.dmaap;
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.anyString;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
42 import java.io.IOException;
43 import java.util.Optional;
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.oransc.policyagent.clients.AsyncRestClient;
49 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
50 import org.oransc.policyagent.repository.ImmutablePolicyType;
51 import org.oransc.policyagent.repository.PolicyType;
52 import org.oransc.policyagent.utils.LoggingUtils;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.springframework.http.HttpStatus;
56 import org.springframework.http.ResponseEntity;
58 import reactor.core.publisher.Mono;
59 import reactor.test.StepVerifier;
61 class DmaapMessageHandlerTest {
62 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
63 private static final String URL = "url";
65 private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
66 private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
67 private DmaapMessageHandler testedObject;
68 private static Gson gson = new GsonBuilder() //
72 private void setUp() throws Exception {
73 testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
76 static JsonObject payloadAsJson() {
77 return gson.fromJson(payloadAsString(), JsonObject.class);
80 static String payloadAsString() {
81 PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
82 return gson.toJson(pt);
85 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
86 Optional<JsonObject> payload =
87 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
89 return ImmutableDmaapRequestMessage.builder() //
90 .apiVersion("apiVersion") //
91 .correlationId("correlationId") //
92 .operation(operation) //
93 .originatorId("originatorId") //
95 .requestId("requestId") //
97 .timestamp("timestamp") //
102 private String dmaapInputMessage(Operation operation) {
103 return gson.toJson(dmaapRequestMessage(operation));
106 private Mono<ResponseEntity<String>> okResponse() {
107 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
108 return Mono.just(entity);
111 private Mono<ResponseEntity<String>> notOkResponse() {
112 ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
113 return Mono.just(entity);
117 void testMessageParsing() {
118 String message = dmaapInputMessage(Operation.DELETE);
119 logger.info(message);
120 DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
121 assertNotNull(parsedMessage);
122 assertFalse(parsedMessage.payload().isPresent());
124 message = dmaapInputMessage(Operation.PUT);
125 logger.info(message);
126 parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
127 assertNotNull(parsedMessage);
128 assertTrue(parsedMessage.payload().isPresent());
132 void unparseableMessage_thenWarning() {
133 final ListAppender<ILoggingEvent> logAppender =
134 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
136 String msg = "bad message";
137 testedObject.handleDmaapMsg(msg);
139 assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
140 "handleDmaapMsg failure org.oransc.policyagent.exceptions.ServiceException: Received unparsable "
141 + "message from DMAAP: \"" + msg + "\", reason: ");
145 void successfulDelete() throws IOException {
146 doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
147 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
149 String message = dmaapInputMessage(Operation.DELETE);
152 .create(testedObject.createTask(message)) //
153 .expectSubscription() //
155 .verifyComplete(); //
157 verify(agentClient).deleteForEntity(URL);
158 verifyNoMoreInteractions(agentClient);
160 verify(dmaapClient).post(anyString(), anyString());
162 verifyNoMoreInteractions(dmaapClient);
166 void successfulGet() throws IOException {
167 doReturn(okResponse()).when(agentClient).getForEntity(anyString());
168 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
171 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
172 .expectSubscription() //
174 .verifyComplete(); //
176 verify(agentClient).getForEntity(URL);
177 verifyNoMoreInteractions(agentClient);
179 verify(dmaapClient).post(anyString(), anyString());
180 verifyNoMoreInteractions(dmaapClient);
184 void successfulPut() throws IOException {
185 doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
186 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
189 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
190 .expectSubscription() //
192 .verifyComplete(); //
194 verify(agentClient).putForEntity(URL, payloadAsString());
195 verifyNoMoreInteractions(agentClient);
197 verify(dmaapClient).post(anyString(), anyString());
198 verifyNoMoreInteractions(dmaapClient);
202 void successfulPost() throws IOException {
203 doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
204 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
207 .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
208 .expectSubscription() //
210 .verifyComplete(); //
212 verify(agentClient).postForEntity(URL, payloadAsString());
213 verifyNoMoreInteractions(agentClient);
215 verify(dmaapClient).post(anyString(), anyString());
216 verifyNoMoreInteractions(dmaapClient);
220 void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
222 doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString());
223 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
225 testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
227 verify(agentClient).putForEntity(anyString(), anyString());
228 verifyNoMoreInteractions(agentClient);
230 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
231 verify(dmaapClient).post(anyString(), captor.capture());
232 String actualMessage = captor.getValue();
233 assertThat(actualMessage).contains(HttpStatus.BAD_GATEWAY.toString())
234 .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY);
236 verifyNoMoreInteractions(dmaapClient);
240 void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
241 String message = dmaapInputMessage(Operation.PUT).toString();
242 String badOperation = "BAD";
243 message = message.replace(Operation.PUT.toString(), badOperation);
245 testedObject.handleDmaapMsg(message);
247 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
248 verify(dmaapClient).post(anyString(), captor.capture());
249 String actualMessage = captor.getValue();
250 assertThat(actualMessage).contains("Not implemented operation");
251 assertThat(actualMessage).contains("BAD_REQUEST");
255 void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
256 String message = dmaapInputMessage(Operation.PUT).toString();
257 message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
259 final ListAppender<ILoggingEvent> logAppender =
260 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
262 testedObject.handleDmaapMsg(message);
264 assertThat(logAppender.list.get(0).getFormattedMessage())
265 .startsWith("Expected payload in message from DMAAP: ");