2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.oransc.policyagent.dmaap;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
34 import com.google.gson.Gson;
35 import com.google.gson.GsonBuilder;
36 import com.google.gson.JsonObject;
38 import java.io.IOException;
39 import java.util.Optional;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.onap.dmaap.mr.client.MRBatchingPublisher;
44 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
45 import org.oransc.policyagent.clients.AsyncRestClient;
46 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
47 import org.oransc.policyagent.repository.ImmutablePolicyType;
48 import org.oransc.policyagent.repository.PolicyType;
50 import reactor.core.publisher.Mono;
51 import reactor.test.StepVerifier;
53 public class DmaapMessageHandlerTest {
55 private static final String URL = "url";
57 private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
58 private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
59 private DmaapMessageHandler testedObject;
60 private static Gson gson = new GsonBuilder() //
64 private void setUp() throws Exception {
65 testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
68 static JsonObject payloadAsJson() {
69 return gson.fromJson(payloadAsString(), JsonObject.class);
72 static String payloadAsString() {
73 PolicyType pt = ImmutablePolicyType.builder().name("name").schema("schema").build();
74 return gson.toJson(pt);
77 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
78 Optional<JsonObject> payload =
79 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
81 return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
82 .correlationId("correlationId") //
83 .operation(operation) //
84 .originatorId("originatorId") //
86 .requestId("requestId") //
88 .timestamp("timestamp") //
94 private String dmaapInputMessage(Operation operation) {
95 return gson.toJson(dmaapRequestMessage(operation));
99 public void testMessageParsing() {
100 String message = dmaapInputMessage(Operation.DELETE);
101 System.out.println(message);
102 DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
103 assertTrue(parsedMessage != null);
104 assertFalse(parsedMessage.payload().isPresent());
106 message = dmaapInputMessage(Operation.PUT);
107 System.out.println(message);
108 parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
109 assertTrue(parsedMessage != null);
110 assertTrue(parsedMessage.payload().isPresent());
114 public void successfulDelete() throws IOException {
115 doReturn(Mono.just("OK")).when(agentClient).delete(anyString());
116 doReturn(1).when(dmaapClient).send(anyString());
117 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
119 String message = dmaapInputMessage(Operation.DELETE);
122 .create(testedObject.createTask(message)) //
123 .expectSubscription() //
125 .verifyComplete(); //
127 verify(agentClient, times(1)).delete(URL);
128 verifyNoMoreInteractions(agentClient);
130 verify(dmaapClient, times(1)).send(anyString());
131 verify(dmaapClient, times(1)).sendBatchWithResponse();
132 verifyNoMoreInteractions(dmaapClient);
136 public void successfulGet() throws IOException {
137 doReturn(Mono.just("OK")).when(agentClient).get(anyString());
138 doReturn(1).when(dmaapClient).send(anyString());
139 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
142 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
143 .expectSubscription() //
145 .verifyComplete(); //
147 verify(agentClient, times(1)).get(URL);
148 verifyNoMoreInteractions(agentClient);
150 verify(dmaapClient, times(1)).send(anyString());
151 verify(dmaapClient, times(1)).sendBatchWithResponse();
152 verifyNoMoreInteractions(dmaapClient);
156 public void successfulPut() throws IOException {
157 doReturn(Mono.just("OK")).when(agentClient).put(anyString(), anyString());
158 doReturn(1).when(dmaapClient).send(anyString());
159 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
162 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
163 .expectSubscription() //
165 .verifyComplete(); //
167 verify(agentClient, times(1)).put(URL, payloadAsString());
168 verifyNoMoreInteractions(agentClient);
170 verify(dmaapClient, times(1)).send(anyString());
171 verify(dmaapClient, times(1)).sendBatchWithResponse();
172 verifyNoMoreInteractions(dmaapClient);
176 public void successfulPost() throws IOException {
177 doReturn(Mono.just("OK")).when(agentClient).post(anyString(), anyString());
178 doReturn(1).when(dmaapClient).send(anyString());
179 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
182 .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
183 .expectSubscription() //
185 .verifyComplete(); //
187 verify(agentClient, times(1)).post(URL, payloadAsString());
188 verifyNoMoreInteractions(agentClient);
190 verify(dmaapClient, times(1)).send(anyString());
191 verify(dmaapClient, times(1)).sendBatchWithResponse();
192 verifyNoMoreInteractions(dmaapClient);
196 public void errorCase() throws IOException {
197 doReturn(Mono.error(new Exception("Refused"))).when(agentClient).put(anyString(), any());
198 doReturn(1).when(dmaapClient).send(anyString());
199 doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
201 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
202 .expectSubscription() //
203 .verifyComplete(); //
205 verify(agentClient, times(1)).put(anyString(), anyString());
206 verifyNoMoreInteractions(agentClient);
209 verify(dmaapClient, times(1)).send(anyString());
210 verify(dmaapClient, times(1)).sendBatchWithResponse();
211 verifyNoMoreInteractions(dmaapClient);