Merge "Update mrstub with nginx"
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageHandlerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33
34 import ch.qos.logback.classic.Level;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
37
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
41
42 import java.io.IOException;
43 import java.util.Optional;
44
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.onap.dmaap.mr.client.MRBatchingPublisher;
49 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
50 import org.oransc.policyagent.clients.AsyncRestClient;
51 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
52 import org.oransc.policyagent.repository.ImmutablePolicyType;
53 import org.oransc.policyagent.repository.PolicyType;
54 import org.oransc.policyagent.utils.LoggingUtils;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.http.HttpStatus;
58 import org.springframework.http.ResponseEntity;
59 import org.springframework.web.reactive.function.client.WebClientResponseException;
60
61 import reactor.core.publisher.Mono;
62 import reactor.test.StepVerifier;
63
64 public class DmaapMessageHandlerTest {
65     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
66     private static final String URL = "url";
67
68     private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
69     private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
70     private DmaapMessageHandler testedObject;
71     private static Gson gson = new GsonBuilder() //
72         .create(); //
73
74     @BeforeEach
75     private void setUp() throws Exception {
76         testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
77     }
78
79     static JsonObject payloadAsJson() {
80         return gson.fromJson(payloadAsString(), JsonObject.class);
81     }
82
83     static String payloadAsString() {
84         PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
85         return gson.toJson(pt);
86     }
87
88     DmaapRequestMessage dmaapRequestMessage(Operation operation) {
89         Optional<JsonObject> payload =
90             ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
91                 : Optional.empty());
92         return ImmutableDmaapRequestMessage.builder() //
93             .apiVersion("apiVersion") //
94             .correlationId("correlationId") //
95             .operation(operation) //
96             .originatorId("originatorId") //
97             .payload(payload) //
98             .requestId("requestId") //
99             .target("target") //
100             .timestamp("timestamp") //
101             .url(URL) //
102             .build();
103     }
104
105     private String dmaapInputMessage(Operation operation) {
106         return gson.toJson(dmaapRequestMessage(operation));
107     }
108
109     private Mono<ResponseEntity<String>> okResponse() {
110         ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
111         return Mono.just(entity);
112     }
113
114     @Test
115     public void testMessageParsing() {
116         String message = dmaapInputMessage(Operation.DELETE);
117         logger.info(message);
118         DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
119         assertTrue(parsedMessage != null);
120         assertFalse(parsedMessage.payload().isPresent());
121
122         message = dmaapInputMessage(Operation.PUT);
123         logger.info(message);
124         parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
125         assertTrue(parsedMessage != null);
126         assertTrue(parsedMessage.payload().isPresent());
127     }
128
129     @Test
130     public void unparseableMessage_thenWarning() {
131         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
132
133         testedObject.handleDmaapMsg("bad message");
134
135         assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
136         assertThat(logAppender.list.toString().contains("handleDmaapMsg failure ")).isTrue();
137     }
138
139     @Test
140     public void successfulDelete() throws IOException {
141         doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
142         doReturn(1).when(dmaapClient).send(anyString());
143         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
144
145         String message = dmaapInputMessage(Operation.DELETE);
146
147         StepVerifier //
148             .create(testedObject.createTask(message)) //
149             .expectSubscription() //
150             .expectNext("OK") //
151             .verifyComplete(); //
152
153         verify(agentClient).deleteForEntity(URL);
154         verifyNoMoreInteractions(agentClient);
155
156         verify(dmaapClient).send(anyString());
157         verify(dmaapClient).sendBatchWithResponse();
158         verifyNoMoreInteractions(dmaapClient);
159     }
160
161     @Test
162     public void successfulGet() throws IOException {
163         doReturn(okResponse()).when(agentClient).getForEntity(anyString());
164         doReturn(1).when(dmaapClient).send(anyString());
165         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
166
167         StepVerifier //
168             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
169             .expectSubscription() //
170             .expectNext("OK") //
171             .verifyComplete(); //
172
173         verify(agentClient).getForEntity(URL);
174         verifyNoMoreInteractions(agentClient);
175
176         verify(dmaapClient).send(anyString());
177         verify(dmaapClient).sendBatchWithResponse();
178         verifyNoMoreInteractions(dmaapClient);
179     }
180
181     @Test
182     public void successfulPut() throws IOException {
183         doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
184         doReturn(1).when(dmaapClient).send(anyString());
185         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
186
187         StepVerifier //
188             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
189             .expectSubscription() //
190             .expectNext("OK") //
191             .verifyComplete(); //
192
193         verify(agentClient).putForEntity(URL, payloadAsString());
194         verifyNoMoreInteractions(agentClient);
195
196         verify(dmaapClient).send(anyString());
197         verify(dmaapClient).sendBatchWithResponse();
198         verifyNoMoreInteractions(dmaapClient);
199     }
200
201     @Test
202     public void successfulPost() throws IOException {
203         doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
204         doReturn(1).when(dmaapClient).send(anyString());
205         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
206
207         StepVerifier //
208             .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
209             .expectSubscription() //
210             .expectNext("OK") //
211             .verifyComplete(); //
212
213         verify(agentClient).postForEntity(URL, payloadAsString());
214         verifyNoMoreInteractions(agentClient);
215
216         verify(dmaapClient).send(anyString());
217         verify(dmaapClient).sendBatchWithResponse();
218         verifyNoMoreInteractions(dmaapClient);
219     }
220
221     @Test
222     public void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
223         WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null);
224         doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any());
225         doReturn(1).when(dmaapClient).send(anyString());
226         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
227
228         StepVerifier //
229             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
230             .expectSubscription() //
231             .verifyComplete(); //
232
233         verify(agentClient).putForEntity(anyString(), anyString());
234         verifyNoMoreInteractions(agentClient);
235
236         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
237         verify(dmaapClient).send(captor.capture());
238         String actualMessage = captor.getValue();
239         assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString())).isTrue();
240
241         verify(dmaapClient).sendBatchWithResponse();
242         verifyNoMoreInteractions(dmaapClient);
243     }
244
245     @Test
246     public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
247         String message = dmaapInputMessage(Operation.PUT).toString();
248         String badOperation = "BAD";
249         message = message.replace(Operation.PUT.toString(), badOperation);
250
251         testedObject.handleDmaapMsg(message);
252
253         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
254         verify(dmaapClient).send(captor.capture());
255         String actualMessage = captor.getValue();
256         assertThat(actualMessage
257             .contains(HttpStatus.BAD_REQUEST + "\",\"message\":\"Not implemented operation: " + badOperation)).isTrue();
258
259         verify(dmaapClient).sendBatchWithResponse();
260         verifyNoMoreInteractions(dmaapClient);
261     }
262
263     @Test
264     public void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
265         String message = dmaapInputMessage(Operation.PUT).toString();
266         message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
267
268         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
269
270         testedObject.handleDmaapMsg(message);
271
272         assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
273         assertThat(logAppender.list.toString().contains("Expected payload in message from DMAAP: ")).isTrue();
274     }
275 }