d034d4d02305713f9d6e1745ba9177dc68eeb7cc
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageHandlerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33
34 import ch.qos.logback.classic.Level;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
37
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
41
42 import java.io.IOException;
43 import java.util.Optional;
44
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.onap.dmaap.mr.client.MRBatchingPublisher;
49 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
50 import org.oransc.policyagent.clients.AsyncRestClient;
51 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
52 import org.oransc.policyagent.repository.ImmutablePolicyType;
53 import org.oransc.policyagent.repository.PolicyType;
54 import org.oransc.policyagent.utils.LoggingUtils;
55 import org.springframework.http.HttpStatus;
56 import org.springframework.http.ResponseEntity;
57 import org.springframework.web.reactive.function.client.WebClientResponseException;
58
59 import reactor.core.publisher.Mono;
60 import reactor.test.StepVerifier;
61
62 public class DmaapMessageHandlerTest {
63
64     private static final String URL = "url";
65
66     private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
67     private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
68     private DmaapMessageHandler testedObject;
69     private static Gson gson = new GsonBuilder() //
70         .create(); //
71
72     @BeforeEach
73     private void setUp() throws Exception {
74         testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
75     }
76
77     static JsonObject payloadAsJson() {
78         return gson.fromJson(payloadAsString(), JsonObject.class);
79     }
80
81     static String payloadAsString() {
82         PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
83         return gson.toJson(pt);
84     }
85
86     DmaapRequestMessage dmaapRequestMessage(Operation operation) {
87         Optional<JsonObject> payload =
88             ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
89                 : Optional.empty());
90         return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
91             .correlationId("correlationId") //
92             .operation(operation) //
93             .originatorId("originatorId") //
94             .payload(payload) //
95             .requestId("requestId") //
96             .target("target") //
97             .timestamp("timestamp") //
98             .type("type") //
99             .url(URL) //
100             .build();
101     }
102
103     private String dmaapInputMessage(Operation operation) {
104         return gson.toJson(dmaapRequestMessage(operation));
105     }
106
107     private Mono<ResponseEntity<String>> okResponse() {
108         ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
109         return Mono.just(entity);
110     }
111
112     @Test
113     public void testMessageParsing() {
114         String message = dmaapInputMessage(Operation.DELETE);
115         System.out.println(message);
116         DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
117         assertTrue(parsedMessage != null);
118         assertFalse(parsedMessage.payload().isPresent());
119
120         message = dmaapInputMessage(Operation.PUT);
121         System.out.println(message);
122         parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
123         assertTrue(parsedMessage != null);
124         assertTrue(parsedMessage.payload().isPresent());
125     }
126
127     @Test
128     public void unparseableMessage_thenWarning() {
129         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
130
131         testedObject.handleDmaapMsg("bad message");
132
133         assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
134         assertThat(logAppender.list.toString().contains("handleDmaapMsg failure ")).isTrue();
135     }
136
137     @Test
138     public void successfulDelete() throws IOException {
139         doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
140         doReturn(1).when(dmaapClient).send(anyString());
141         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
142
143         String message = dmaapInputMessage(Operation.DELETE);
144
145         StepVerifier //
146             .create(testedObject.createTask(message)) //
147             .expectSubscription() //
148             .expectNext("OK") //
149             .verifyComplete(); //
150
151         verify(agentClient).deleteForEntity(URL);
152         verifyNoMoreInteractions(agentClient);
153
154         verify(dmaapClient).send(anyString());
155         verify(dmaapClient).sendBatchWithResponse();
156         verifyNoMoreInteractions(dmaapClient);
157     }
158
159     @Test
160     public void successfulGet() throws IOException {
161         doReturn(okResponse()).when(agentClient).getForEntity(anyString());
162         doReturn(1).when(dmaapClient).send(anyString());
163         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
164
165         StepVerifier //
166             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
167             .expectSubscription() //
168             .expectNext("OK") //
169             .verifyComplete(); //
170
171         verify(agentClient).getForEntity(URL);
172         verifyNoMoreInteractions(agentClient);
173
174         verify(dmaapClient).send(anyString());
175         verify(dmaapClient).sendBatchWithResponse();
176         verifyNoMoreInteractions(dmaapClient);
177     }
178
179     @Test
180     public void successfulPut() throws IOException {
181         doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
182         doReturn(1).when(dmaapClient).send(anyString());
183         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
184
185         StepVerifier //
186             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
187             .expectSubscription() //
188             .expectNext("OK") //
189             .verifyComplete(); //
190
191         verify(agentClient).putForEntity(URL, payloadAsString());
192         verifyNoMoreInteractions(agentClient);
193
194         verify(dmaapClient).send(anyString());
195         verify(dmaapClient).sendBatchWithResponse();
196         verifyNoMoreInteractions(dmaapClient);
197     }
198
199     @Test
200     public void successfulPost() throws IOException {
201         doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
202         doReturn(1).when(dmaapClient).send(anyString());
203         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
204
205         StepVerifier //
206             .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
207             .expectSubscription() //
208             .expectNext("OK") //
209             .verifyComplete(); //
210
211         verify(agentClient).postForEntity(URL, payloadAsString());
212         verifyNoMoreInteractions(agentClient);
213
214         verify(dmaapClient).send(anyString());
215         verify(dmaapClient).sendBatchWithResponse();
216         verifyNoMoreInteractions(dmaapClient);
217     }
218
219     @Test
220     public void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
221         WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null);
222         doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any());
223         doReturn(1).when(dmaapClient).send(anyString());
224         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
225
226         StepVerifier //
227             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
228             .expectSubscription() //
229             .verifyComplete(); //
230
231         verify(agentClient).putForEntity(anyString(), anyString());
232         verifyNoMoreInteractions(agentClient);
233
234         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
235         verify(dmaapClient).send(captor.capture());
236         String actualMessage = captor.getValue();
237         assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString())).isTrue();
238
239         verify(dmaapClient).sendBatchWithResponse();
240         verifyNoMoreInteractions(dmaapClient);
241     }
242
243     @Test
244     public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
245         String message = dmaapInputMessage(Operation.PUT).toString();
246         String badOperation = "BAD";
247         message = message.replace(Operation.PUT.toString(), badOperation);
248
249         testedObject.handleDmaapMsg(message);
250
251         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
252         verify(dmaapClient).send(captor.capture());
253         String actualMessage = captor.getValue();
254         assertThat(actualMessage
255             .contains(HttpStatus.BAD_REQUEST + "\",\"message\":\"Not implemented operation: " + badOperation)).isTrue();
256
257         verify(dmaapClient).sendBatchWithResponse();
258         verifyNoMoreInteractions(dmaapClient);
259     }
260
261     @Test
262     public void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
263         String message = dmaapInputMessage(Operation.PUT).toString();
264         message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
265
266         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
267
268         testedObject.handleDmaapMsg(message);
269
270         assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
271         assertThat(logAppender.list.toString().contains("Expected payload in message from DMAAP: ")).isTrue();
272     }
273 }