import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.oransc.policyagent.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
@ExtendWith(MockitoExtension.class)
-public class DmaapMessageConsumerTest {
+class DmaapMessageConsumerTest {
@Mock
private ApplicationConfig applicationConfigMock;
@Mock
private DmaapMessageConsumer messageConsumerUnderTest;
@AfterEach
- public void resetLogging() {
+ void resetLogging() {
LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
}
@Test
- public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
+ void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
messageConsumerUnderTest.start().join();
InOrder orderVerifier = inOrder(messageConsumerUnderTest);
- orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
}
@Test
- public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
+ void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
InOrder orderVerifier = inOrder(messageConsumerUnderTest);
orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
- orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
}
@Test
- public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+ void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
response.setActualMessages(Collections.emptyList());
- doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
}
@Test
- public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
+ void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
}
@Test
- public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
- Properties properties = setUpMrConfig();
-
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+ setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
response.setActualMessages(messages);
when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
- doReturn(messageHandlerMock).when(messageConsumerUnderTest)
- .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
-
- AsyncRestClient restClientMock = mock(AsyncRestClient.class);
- doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
-
- MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
- doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
messageConsumerUnderTest.start().join();
- verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
- verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
-
verify(messageHandlerMock).handleDmaapMsg(responseMessage);
verifyNoMoreInteractions(messageHandlerMock);
}