2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.oransc.policyagent.dmaap;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
34 import ch.qos.logback.classic.Level;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
42 import java.io.IOException;
43 import java.util.Optional;
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.onap.dmaap.mr.client.MRBatchingPublisher;
49 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
50 import org.oransc.policyagent.clients.AsyncRestClient;
51 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
52 import org.oransc.policyagent.repository.ImmutablePolicyType;
53 import org.oransc.policyagent.repository.PolicyType;
54 import org.oransc.policyagent.utils.LoggingUtils;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.http.HttpStatus;
58 import org.springframework.http.ResponseEntity;
59 import org.springframework.web.reactive.function.client.WebClientResponseException;
61 import reactor.core.publisher.Mono;
62 import reactor.test.StepVerifier;
64 public class DmaapMessageHandlerTest {
65 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
66 private static final String URL = "url";
68 private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
69 private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
70 private DmaapMessageHandler testedObject;
71 private static Gson gson = new GsonBuilder() //
75 private void setUp() throws Exception {
76 testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
79 static JsonObject payloadAsJson() {
80 return gson.fromJson(payloadAsString(), JsonObject.class);
83 static String payloadAsString() {
84 PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
85 return gson.toJson(pt);
88 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
89 Optional<JsonObject> payload =
90 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
92 return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
93 .correlationId("correlationId") //
94 .operation(operation) //
95 .originatorId("originatorId") //
97 .requestId("requestId") //
99 .timestamp("timestamp") //
105 private String dmaapInputMessage(Operation operation) {
106 return gson.toJson(dmaapRequestMessage(operation));
109 private Mono<ResponseEntity<String>> okResponse() {
110 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
111 return Mono.just(entity);
115 public void testMessageParsing() {
116 String message = dmaapInputMessage(Operation.DELETE);
117 logger.info(message);
118 DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
119 assertTrue(parsedMessage != null);
120 assertFalse(parsedMessage.payload().isPresent());
122 message = dmaapInputMessage(Operation.PUT);
123 logger.info(message);
124 parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
125 assertTrue(parsedMessage != null);
126 assertTrue(parsedMessage.payload().isPresent());
130 public void unparseableMessage_thenWarning() {
131 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
133 testedObject.handleDmaapMsg("bad message");
135 assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
136 assertThat(logAppender.list.toString().contains("handleDmaapMsg failure ")).isTrue();
140 public void successfulDelete() throws IOException {
141 doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
142 doReturn(1).when(dmaapClient).send(anyString());
143 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
145 String message = dmaapInputMessage(Operation.DELETE);
148 .create(testedObject.createTask(message)) //
149 .expectSubscription() //
151 .verifyComplete(); //
153 verify(agentClient).deleteForEntity(URL);
154 verifyNoMoreInteractions(agentClient);
156 verify(dmaapClient).send(anyString());
157 verify(dmaapClient).sendBatchWithResponse();
158 verifyNoMoreInteractions(dmaapClient);
162 public void successfulGet() throws IOException {
163 doReturn(okResponse()).when(agentClient).getForEntity(anyString());
164 doReturn(1).when(dmaapClient).send(anyString());
165 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
168 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
169 .expectSubscription() //
171 .verifyComplete(); //
173 verify(agentClient).getForEntity(URL);
174 verifyNoMoreInteractions(agentClient);
176 verify(dmaapClient).send(anyString());
177 verify(dmaapClient).sendBatchWithResponse();
178 verifyNoMoreInteractions(dmaapClient);
182 public void successfulPut() throws IOException {
183 doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
184 doReturn(1).when(dmaapClient).send(anyString());
185 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
188 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
189 .expectSubscription() //
191 .verifyComplete(); //
193 verify(agentClient).putForEntity(URL, payloadAsString());
194 verifyNoMoreInteractions(agentClient);
196 verify(dmaapClient).send(anyString());
197 verify(dmaapClient).sendBatchWithResponse();
198 verifyNoMoreInteractions(dmaapClient);
202 public void successfulPost() throws IOException {
203 doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
204 doReturn(1).when(dmaapClient).send(anyString());
205 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
208 .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
209 .expectSubscription() //
211 .verifyComplete(); //
213 verify(agentClient).postForEntity(URL, payloadAsString());
214 verifyNoMoreInteractions(agentClient);
216 verify(dmaapClient).send(anyString());
217 verify(dmaapClient).sendBatchWithResponse();
218 verifyNoMoreInteractions(dmaapClient);
222 public void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
223 WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null);
224 doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any());
225 doReturn(1).when(dmaapClient).send(anyString());
226 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
229 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
230 .expectSubscription() //
231 .verifyComplete(); //
233 verify(agentClient).putForEntity(anyString(), anyString());
234 verifyNoMoreInteractions(agentClient);
236 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
237 verify(dmaapClient).send(captor.capture());
238 String actualMessage = captor.getValue();
239 assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString())).isTrue();
241 verify(dmaapClient).sendBatchWithResponse();
242 verifyNoMoreInteractions(dmaapClient);
246 public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
247 String message = dmaapInputMessage(Operation.PUT).toString();
248 String badOperation = "BAD";
249 message = message.replace(Operation.PUT.toString(), badOperation);
251 testedObject.handleDmaapMsg(message);
253 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
254 verify(dmaapClient).send(captor.capture());
255 String actualMessage = captor.getValue();
256 assertThat(actualMessage
257 .contains(HttpStatus.BAD_REQUEST + "\",\"message\":\"Not implemented operation: " + badOperation)).isTrue();
259 verify(dmaapClient).sendBatchWithResponse();
260 verifyNoMoreInteractions(dmaapClient);
264 public void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
265 String message = dmaapInputMessage(Operation.PUT).toString();
266 message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
268 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
270 testedObject.handleDmaapMsg(message);
272 assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
273 assertThat(logAppender.list.toString().contains("Expected payload in message from DMAAP: ")).isTrue();