Bugfix, creating only one DmaapProducer
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doNothing;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.inOrder;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.mockito.Mockito.when;
33
34 import ch.qos.logback.classic.spi.ILoggingEvent;
35 import ch.qos.logback.core.read.ListAppender;
36
37 import java.time.Duration;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Properties;
43
44 import org.junit.jupiter.api.AfterEach;
45 import org.junit.jupiter.api.Test;
46 import org.junit.jupiter.api.extension.ExtendWith;
47 import org.mockito.InOrder;
48 import org.mockito.Mock;
49 import org.mockito.junit.jupiter.MockitoExtension;
50 import org.onap.dmaap.mr.client.MRConsumer;
51 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
52 import org.oransc.policyagent.configuration.ApplicationConfig;
53 import org.oransc.policyagent.tasks.RefreshConfigTask;
54 import org.oransc.policyagent.utils.LoggingUtils;
55 import org.springframework.http.HttpStatus;
56
57 @ExtendWith(MockitoExtension.class)
58 public class DmaapMessageConsumerTest {
59     @Mock
60     private ApplicationConfig applicationConfigMock;
61     @Mock
62     private MRConsumer messageRouterConsumerMock;
63     @Mock
64     private DmaapMessageHandler messageHandlerMock;
65
66     private DmaapMessageConsumer messageConsumerUnderTest;
67
68     @AfterEach
69     public void resetLogging() {
70         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
71     }
72
73     @Test
74     public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
75         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
76
77         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
78         doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
79         doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
80         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
81
82         messageConsumerUnderTest.start().join();
83
84         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
85         orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
86         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
87     }
88
89     @Test
90     public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
91         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
92
93         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
94         doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
95         doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
96         doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
97
98         messageConsumerUnderTest.start().join();
99
100         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
101         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
102         orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
103     }
104
105     @Test
106     public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
107         setUpMrConfig();
108
109         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
110
111         MRConsumerResponse response = new MRConsumerResponse();
112         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
113         response.setActualMessages(Collections.emptyList());
114
115         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
116         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
117             .getMessageRouterConsumer(any(Properties.class));
118         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
119
120         messageConsumerUnderTest.start().join();
121
122         verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
123         verifyNoMoreInteractions(messageRouterConsumerMock);
124     }
125
126     @Test
127     public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
128         setUpMrConfig();
129
130         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
131
132         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
133         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
134         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
135             .getMessageRouterConsumer(any(Properties.class));
136
137         MRConsumerResponse response = new MRConsumerResponse();
138         int responseCode = HttpStatus.BAD_REQUEST.value();
139         response.setResponseCode(Integer.toString(responseCode));
140         String responseMessage = "Error";
141         response.setResponseMessage(responseMessage);
142         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
143
144         final ListAppender<ILoggingEvent> logAppender =
145             LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
146
147         messageConsumerUnderTest.start().join();
148
149         assertThat(logAppender.list.toString()
150             .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."))
151                 .isTrue();
152
153         verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
154     }
155
156     @Test
157     public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
158         setUpMrConfig();
159         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
160
161         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
162         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
163             .getMessageRouterConsumer(any(Properties.class));
164
165         MRConsumerResponse response = new MRConsumerResponse();
166         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
167         String responseMessage = "message";
168         List<String> messages = Arrays.asList(responseMessage);
169         response.setActualMessages(messages);
170         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
171
172         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
173
174         messageConsumerUnderTest.start().join();
175
176         verify(messageHandlerMock).handleDmaapMsg(responseMessage);
177         verifyNoMoreInteractions(messageHandlerMock);
178     }
179
180     private Properties setUpMrConfig() {
181         Properties properties = new Properties();
182         properties.put("key", "value");
183         when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
184         when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
185         return properties;
186     }
187 }