2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.oransc.policyagent.dmaap;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
34 import ch.qos.logback.classic.Level;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
42 import java.io.IOException;
43 import java.util.Optional;
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.onap.dmaap.mr.client.MRBatchingPublisher;
49 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
50 import org.oransc.policyagent.clients.AsyncRestClient;
51 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
52 import org.oransc.policyagent.repository.ImmutablePolicyType;
53 import org.oransc.policyagent.repository.PolicyType;
54 import org.oransc.policyagent.utils.LoggingUtils;
55 import org.springframework.http.HttpStatus;
56 import org.springframework.http.ResponseEntity;
57 import org.springframework.web.reactive.function.client.WebClientResponseException;
59 import reactor.core.publisher.Mono;
60 import reactor.test.StepVerifier;
62 public class DmaapMessageHandlerTest {
64 private static final String URL = "url";
66 private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
67 private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
68 private DmaapMessageHandler testedObject;
69 private static Gson gson = new GsonBuilder() //
73 private void setUp() throws Exception {
74 testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
77 static JsonObject payloadAsJson() {
78 return gson.fromJson(payloadAsString(), JsonObject.class);
81 static String payloadAsString() {
82 PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
83 return gson.toJson(pt);
86 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
87 Optional<JsonObject> payload =
88 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
90 return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
91 .correlationId("correlationId") //
92 .operation(operation) //
93 .originatorId("originatorId") //
95 .requestId("requestId") //
97 .timestamp("timestamp") //
103 private String dmaapInputMessage(Operation operation) {
104 return gson.toJson(dmaapRequestMessage(operation));
107 private Mono<ResponseEntity<String>> okResponse() {
108 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
109 return Mono.just(entity);
113 public void testMessageParsing() {
114 String message = dmaapInputMessage(Operation.DELETE);
115 System.out.println(message);
116 DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
117 assertTrue(parsedMessage != null);
118 assertFalse(parsedMessage.payload().isPresent());
120 message = dmaapInputMessage(Operation.PUT);
121 System.out.println(message);
122 parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
123 assertTrue(parsedMessage != null);
124 assertTrue(parsedMessage.payload().isPresent());
128 public void unparseableMessage_thenWarning() {
129 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
131 testedObject.handleDmaapMsg("bad message");
133 assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
134 assertThat(logAppender.list.toString().contains("handleDmaapMsg failure ")).isTrue();
138 public void successfulDelete() throws IOException {
139 doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
140 doReturn(1).when(dmaapClient).send(anyString());
141 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
143 String message = dmaapInputMessage(Operation.DELETE);
146 .create(testedObject.createTask(message)) //
147 .expectSubscription() //
149 .verifyComplete(); //
151 verify(agentClient).deleteForEntity(URL);
152 verifyNoMoreInteractions(agentClient);
154 verify(dmaapClient).send(anyString());
155 verify(dmaapClient).sendBatchWithResponse();
156 verifyNoMoreInteractions(dmaapClient);
160 public void successfulGet() throws IOException {
161 doReturn(okResponse()).when(agentClient).getForEntity(anyString());
162 doReturn(1).when(dmaapClient).send(anyString());
163 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
166 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
167 .expectSubscription() //
169 .verifyComplete(); //
171 verify(agentClient).getForEntity(URL);
172 verifyNoMoreInteractions(agentClient);
174 verify(dmaapClient).send(anyString());
175 verify(dmaapClient).sendBatchWithResponse();
176 verifyNoMoreInteractions(dmaapClient);
180 public void successfulPut() throws IOException {
181 doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
182 doReturn(1).when(dmaapClient).send(anyString());
183 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
186 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
187 .expectSubscription() //
189 .verifyComplete(); //
191 verify(agentClient).putForEntity(URL, payloadAsString());
192 verifyNoMoreInteractions(agentClient);
194 verify(dmaapClient).send(anyString());
195 verify(dmaapClient).sendBatchWithResponse();
196 verifyNoMoreInteractions(dmaapClient);
200 public void successfulPost() throws IOException {
201 doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
202 doReturn(1).when(dmaapClient).send(anyString());
203 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
206 .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
207 .expectSubscription() //
209 .verifyComplete(); //
211 verify(agentClient).postForEntity(URL, payloadAsString());
212 verifyNoMoreInteractions(agentClient);
214 verify(dmaapClient).send(anyString());
215 verify(dmaapClient).sendBatchWithResponse();
216 verifyNoMoreInteractions(dmaapClient);
220 public void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
221 WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null);
222 doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any());
223 doReturn(1).when(dmaapClient).send(anyString());
224 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
227 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
228 .expectSubscription() //
229 .verifyComplete(); //
231 verify(agentClient).putForEntity(anyString(), anyString());
232 verifyNoMoreInteractions(agentClient);
234 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
235 verify(dmaapClient).send(captor.capture());
236 String actualMessage = captor.getValue();
237 assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString())).isTrue();
239 verify(dmaapClient).sendBatchWithResponse();
240 verifyNoMoreInteractions(dmaapClient);
244 public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
245 String message = dmaapInputMessage(Operation.PUT).toString();
246 String badOperation = "BAD";
247 message = message.replace(Operation.PUT.toString(), badOperation);
249 testedObject.handleDmaapMsg(message);
251 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
252 verify(dmaapClient).send(captor.capture());
253 String actualMessage = captor.getValue();
254 assertThat(actualMessage.contains(HttpStatus.NOT_FOUND + "\",\"message\":\"Not implemented operation:"))
257 verify(dmaapClient).sendBatchWithResponse();
258 verifyNoMoreInteractions(dmaapClient);
262 public void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
263 String message = dmaapInputMessage(Operation.PUT).toString();
264 message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
266 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
268 testedObject.handleDmaapMsg(message);
270 assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
271 assertThat(logAppender.list.toString().contains("Expected payload in message from DMAAP: ")).isTrue();