Merge "Remove sleep from DmaapMessageConsumer"
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doNothing;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.inOrder;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34 import static org.mockito.Mockito.when;
35
36 import ch.qos.logback.classic.spi.ILoggingEvent;
37 import ch.qos.logback.core.read.ListAppender;
38
39 import java.time.Duration;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.LinkedList;
43 import java.util.List;
44 import java.util.Properties;
45
46 import org.junit.jupiter.api.AfterEach;
47 import org.junit.jupiter.api.Test;
48 import org.junit.jupiter.api.extension.ExtendWith;
49 import org.mockito.InOrder;
50 import org.mockito.Mock;
51 import org.mockito.junit.jupiter.MockitoExtension;
52 import org.onap.dmaap.mr.client.MRBatchingPublisher;
53 import org.onap.dmaap.mr.client.MRConsumer;
54 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
55 import org.oransc.policyagent.clients.AsyncRestClient;
56 import org.oransc.policyagent.configuration.ApplicationConfig;
57 import org.oransc.policyagent.tasks.RefreshConfigTask;
58 import org.oransc.policyagent.utils.LoggingUtils;
59 import org.springframework.http.HttpStatus;
60
61 @ExtendWith(MockitoExtension.class)
62 public class DmaapMessageConsumerTest {
63     @Mock
64     private ApplicationConfig applicationConfigMock;
65     @Mock
66     private MRConsumer messageRouterConsumerMock;
67     @Mock
68     private DmaapMessageHandler messageHandlerMock;
69
70     private DmaapMessageConsumer messageConsumerUnderTest;
71
72     @AfterEach
73     public void resetLogging() {
74         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
75     }
76
77     @Test
78     public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
79         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
80
81         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
82         doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
83         doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
84         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
85
86         messageConsumerUnderTest.start().join();
87
88         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
89         orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
90         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
91     }
92
93     @Test
94     public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
95         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
96
97         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
98         doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
99         doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
100         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
101
102         messageConsumerUnderTest.start().join();
103
104         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
105         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
106         orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
107     }
108
109     @Test
110     public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
111         setUpMrConfig();
112
113         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
114
115         MRConsumerResponse response = new MRConsumerResponse();
116         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
117         response.setActualMessages(Collections.emptyList());
118
119         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
120         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
121             .getMessageRouterConsumer(any(Properties.class));
122         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
123
124         messageConsumerUnderTest.start().join();
125
126         verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
127         verifyNoMoreInteractions(messageRouterConsumerMock);
128     }
129
130     @Test
131     public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
132         setUpMrConfig();
133
134         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
135
136         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
137         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
138         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
139             .getMessageRouterConsumer(any(Properties.class));
140
141         MRConsumerResponse response = new MRConsumerResponse();
142         int responseCode = HttpStatus.BAD_REQUEST.value();
143         response.setResponseCode(Integer.toString(responseCode));
144         String responseMessage = "Error";
145         response.setResponseMessage(responseMessage);
146         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
147
148         final ListAppender<ILoggingEvent> logAppender =
149             LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
150
151         messageConsumerUnderTest.start().join();
152
153         assertThat(logAppender.list.toString()
154             .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."))
155                 .isTrue();
156
157         verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
158     }
159
160     @Test
161     public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
162         Properties properties = setUpMrConfig();
163
164         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
165
166         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
167         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
168             .getMessageRouterConsumer(any(Properties.class));
169
170         MRConsumerResponse response = new MRConsumerResponse();
171         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
172         String responseMessage = "message";
173         List<String> messages = Arrays.asList(responseMessage);
174         response.setActualMessages(messages);
175         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
176
177         doReturn(messageHandlerMock).when(messageConsumerUnderTest)
178             .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
179
180         AsyncRestClient restClientMock = mock(AsyncRestClient.class);
181         doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
182
183         MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
184         doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
185
186         messageConsumerUnderTest.start().join();
187
188         verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
189         verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
190
191         verify(messageHandlerMock).handleDmaapMsg(responseMessage);
192         verifyNoMoreInteractions(messageHandlerMock);
193     }
194
195     private Properties setUpMrConfig() {
196         Properties properties = new Properties();
197         properties.put("key", "value");
198         when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
199         when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
200         return properties;
201     }
202 }