Merge "Improve developer guide"
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doNothing;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.inOrder;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.mockito.Mockito.when;
33
34 import ch.qos.logback.classic.spi.ILoggingEvent;
35 import ch.qos.logback.core.read.ListAppender;
36
37 import java.time.Duration;
38 import java.util.LinkedList;
39
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.extension.ExtendWith;
43 import org.mockito.ArgumentCaptor;
44 import org.mockito.InOrder;
45 import org.mockito.Mock;
46 import org.mockito.junit.jupiter.MockitoExtension;
47 import org.oransc.policyagent.clients.AsyncRestClient;
48 import org.oransc.policyagent.configuration.ApplicationConfig;
49 import org.oransc.policyagent.utils.LoggingUtils;
50 import org.springframework.http.HttpStatus;
51 import org.springframework.http.ResponseEntity;
52 import reactor.core.publisher.Mono;
53
54 @ExtendWith(MockitoExtension.class)
55 class DmaapMessageConsumerTest {
56     @Mock
57     private ApplicationConfig applicationConfigMock;
58     @Mock
59     private AsyncRestClient messageRouterConsumerMock;
60     @Mock
61     private DmaapMessageHandler messageHandlerMock;
62
63     private DmaapMessageConsumer messageConsumerUnderTest;
64
65     @AfterEach
66     void resetLogging() {
67         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
68     }
69
70     @Test
71     void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
72         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
73
74         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
75         doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
76         doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
77         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
78
79         messageConsumerUnderTest.start().join();
80
81         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
82         orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
83         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
84     }
85
86     @Test
87     void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
88         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
89
90         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
91         doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
92         doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
93         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
94
95         messageConsumerUnderTest.start().join();
96
97         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
98         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
99         orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
100     }
101
102     @Test
103     void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
104         setUpMrConfig();
105
106         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
107
108         Mono<ResponseEntity<String>> response = Mono.empty();
109
110         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
111         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
112         doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
113
114         messageConsumerUnderTest.start().join();
115
116         verify(messageRouterConsumerMock).getForEntity(any());
117         verifyNoMoreInteractions(messageRouterConsumerMock);
118     }
119
120     @Test
121     void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
122         setUpMrConfig();
123
124         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
125
126         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
127         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
128         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
129
130         Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
131         when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
132
133         final ListAppender<ILoggingEvent> logAppender =
134             LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
135
136         messageConsumerUnderTest.start().join();
137
138         assertThat(logAppender.list.get(0).getFormattedMessage())
139             .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
140
141         verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
142     }
143
144     @Test
145     void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
146         // The message from MR is here an array of Json objects
147         setUpMrConfig();
148         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
149
150         String message =
151             "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}";
152         String messages = "[" + message + "]";
153
154         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
155         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
156
157         Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
158         when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
159
160         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
161
162         messageConsumerUnderTest.start().join();
163
164         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
165         verify(messageHandlerMock).handleDmaapMsg(captor.capture());
166         String messageAfterJsonParsing = captor.getValue();
167         assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue();
168
169         verifyNoMoreInteractions(messageHandlerMock);
170     }
171
172     @Test
173     void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
174         // The message from MR is here an array of String (which is the case when the MR
175         // simulator is used)
176         setUpMrConfig();
177         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
178
179         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
180         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
181
182         Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
183         when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
184
185         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
186
187         messageConsumerUnderTest.start().join();
188
189         verify(messageHandlerMock).handleDmaapMsg("aMessage");
190         verifyNoMoreInteractions(messageHandlerMock);
191     }
192
193     private void setUpMrConfig() {
194         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
195         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
196     }
197 }