X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Ftest%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerTest.java;h=6e7865689d11304c59afdc0462e31ea50a7f33a3;hb=d9df6244e4ba89b71cba364f156ca529c3703faa;hp=153c4ecc4dbd911170947087be46c401ea752e66;hpb=5408c157fc8aca52731fcc2cc035ed9dbfcff219;p=nonrtric.git diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index 153c4ecc..6e786568 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -20,43 +20,41 @@ package org.oransc.policyagent.dmaap; +import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Properties; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; @ExtendWith(MockitoExtension.class) -public class DmaapMessageConsumerTest { - final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10); - +class DmaapMessageConsumerTest { @Mock private ApplicationConfig applicationConfigMock; @Mock @@ -66,114 +64,122 @@ public class DmaapMessageConsumerTest { private DmaapMessageConsumer messageConsumerUnderTest; + @AfterEach + void resetLogging() { + LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); + } + @Test - public void dmaapNotConfigured_thenDoNothing() { + void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - messageConsumerUnderTest.run(); + messageConsumerUnderTest.start().join(); - verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS); - verify(applicationConfigMock).getDmaapConsumerConfig(); - verify(applicationConfigMock).getDmaapPublisherConfig(); - verifyNoMoreInteractions(applicationConfigMock); + InOrder orderVerifier = inOrder(messageConsumerUnderTest); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); } @Test - public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + messageConsumerUnderTest.start().join(); + + InOrder orderVerifier = inOrder(messageConsumerUnderTest); + orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + } + + @Test + void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + setUpMrConfig(); + + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); MRConsumerResponse response = new MRConsumerResponse(); response.setResponseCode(Integer.toString(HttpStatus.OK.value())); response.setActualMessages(Collections.emptyList()); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); - - messageConsumerUnderTest.run(); - - verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS); + when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); - verify(applicationConfigMock, times(2)).getDmaapConsumerConfig(); - verify(applicationConfigMock).getDmaapPublisherConfig(); - verifyNoMoreInteractions(applicationConfigMock); + messageConsumerUnderTest.start().join(); verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); verifyNoMoreInteractions(messageRouterConsumerMock); } @Test - public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { + setUpMrConfig(); - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value())); - response.setResponseMessage("Error"); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); - final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); + MRConsumerResponse response = new MRConsumerResponse(); + int responseCode = HttpStatus.BAD_REQUEST.value(); + response.setResponseCode(Integer.toString(responseCode)); + String responseMessage = "Error"; + response.setResponseMessage(responseMessage); + when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + + final ListAppender logAppender = + LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); + + messageConsumerUnderTest.start().join(); - messageConsumerUnderTest.run(); + assertThat(logAppender.list.get(0).getFormattedMessage()).isEqualTo( + "Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."); - assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN); - assertThat( - logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP.")) - .isTrue(); + verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test - public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); - - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) + .getMessageRouterConsumer(any(Properties.class)); MRConsumerResponse response = new MRConsumerResponse(); response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - List messages = Arrays.asList("message"); + String responseMessage = "message"; + List messages = Arrays.asList(responseMessage); response.setActualMessages(messages); + when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); - - doReturn(messageHandlerMock).when(messageConsumerUnderTest) - .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class)); - - AsyncRestClient restClientMock = mock(AsyncRestClient.class); - doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString()); + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); - MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class); - doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class)); + messageConsumerUnderTest.start().join(); - messageConsumerUnderTest.run(); - - verify(messageConsumerUnderTest).createRestClient("http://localhost:0"); - verify(messageConsumerUnderTest).getMessageRouterPublisher(properties); - - verify(messageHandlerMock).handleDmaapMsg("message"); + verify(messageHandlerMock).handleDmaapMsg(responseMessage); verifyNoMoreInteractions(messageHandlerMock); } + + private Properties setUpMrConfig() { + Properties properties = new Properties(); + properties.put("key", "value"); + when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); + when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + return properties; + } }