92c4bb0a7b424a6ec4c629d596e7ea915180c404
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doNothing;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.inOrder;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.mockito.Mockito.when;
33
34 import ch.qos.logback.classic.spi.ILoggingEvent;
35 import ch.qos.logback.core.read.ListAppender;
36
37 import java.time.Duration;
38 import java.util.LinkedList;
39
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.extension.ExtendWith;
43 import org.mockito.ArgumentCaptor;
44 import org.mockito.InOrder;
45 import org.mockito.Mock;
46 import org.mockito.junit.jupiter.MockitoExtension;
47 import org.oransc.policyagent.clients.AsyncRestClient;
48 import org.oransc.policyagent.configuration.ApplicationConfig;
49 import org.oransc.policyagent.utils.LoggingUtils;
50 import org.springframework.http.HttpStatus;
51 import org.springframework.http.ResponseEntity;
52 import reactor.core.publisher.Mono;
53
54 @ExtendWith(MockitoExtension.class)
55 class DmaapMessageConsumerTest {
56     @Mock
57     private ApplicationConfig applicationConfigMock;
58     @Mock
59     private AsyncRestClient messageRouterConsumerMock;
60     @Mock
61     private DmaapMessageHandler messageHandlerMock;
62
63     private DmaapMessageConsumer messageConsumerUnderTest;
64
65     @AfterEach
66     void resetLogging() {
67         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
68     }
69
70     @Test
71     void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
72         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
73
74         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
75         doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
76         doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
77         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
78
79         messageConsumerUnderTest.start().join();
80
81         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
82         orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
83         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
84     }
85
86     @Test
87     void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
88         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
89
90         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
91         doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
92         doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
93         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
94
95         messageConsumerUnderTest.start().join();
96
97         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
98         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
99         orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
100     }
101
102     @Test
103     void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
104         setUpMrConfig();
105
106         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
107
108         Mono<ResponseEntity<String>> response = Mono.empty();
109
110         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
111         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
112         doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
113
114         messageConsumerUnderTest.start().join();
115
116         verify(messageRouterConsumerMock).getForEntity(any());
117         verifyNoMoreInteractions(messageRouterConsumerMock);
118     }
119
120     @Test
121     void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
122         setUpMrConfig();
123
124         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
125
126         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
127         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
128         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
129
130         Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
131         when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
132
133         final ListAppender<ILoggingEvent> logAppender =
134             LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
135
136         messageConsumerUnderTest.start().join();
137
138         assertThat(logAppender.list.get(0).getFormattedMessage())
139             .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
140
141         verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
142     }
143
144     @Test
145     void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
146         // The message from MR is here an array of Json objects
147         setUpMrConfig();
148         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
149
150         String message = "{\"apiVersion\":\"1.0\"," //
151             + "\"operation\":\"GET\"," //
152             + "\"correlationId\":\"1592341013115594000\"," //
153             + "\"originatorId\":\"849e6c6b420\"," //
154             + "\"payload\":{}," //
155             + "\"requestId\":\"23343221\", " //
156             + "\"target\":\"policy-agent\"," //
157             + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
158             + "\"type\":\"request\"," //
159             + "\"url\":\"/rics\"}";
160         String messages = "[" + message + "]";
161
162         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
163         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
164
165         Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
166         when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
167
168         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
169
170         messageConsumerUnderTest.start().join();
171
172         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
173         verify(messageHandlerMock).handleDmaapMsg(captor.capture());
174         String messageAfterJsonParsing = captor.getValue();
175         assertThat(messageAfterJsonParsing).contains("apiVersion");
176
177         verifyNoMoreInteractions(messageHandlerMock);
178     }
179
180     @Test
181     void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
182         // The message from MR is here an array of String (which is the case when the MR
183         // simulator is used)
184         setUpMrConfig();
185         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
186
187         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
188         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
189
190         Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
191         when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
192
193         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
194
195         messageConsumerUnderTest.start().join();
196
197         verify(messageHandlerMock).handleDmaapMsg("aMessage");
198         verifyNoMoreInteractions(messageHandlerMock);
199     }
200
201     private void setUpMrConfig() {
202         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
203         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
204     }
205 }