X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Ftest%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerTest.java;h=92c4bb0a7b424a6ec4c629d596e7ea915180c404;hb=6a39814272307d0207222c9229b0d765ac062bf0;hp=b9696ade88160d38298e4c342b842c1178b7aca6;hpb=083393d0affc7dca6a5cea89f4f9759801a91591;p=nonrtric.git diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index b9696ade..92c4bb0a 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -35,43 +35,40 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; +import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.tasks.RefreshConfigTask; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) -public class DmaapMessageConsumerTest { +class DmaapMessageConsumerTest { @Mock private ApplicationConfig applicationConfigMock; @Mock - private MRConsumer messageRouterConsumerMock; + private AsyncRestClient messageRouterConsumerMock; @Mock private DmaapMessageHandler messageHandlerMock; private DmaapMessageConsumer messageConsumerUnderTest; @AfterEach - public void resetLogging() { + void resetLogging() { LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); } @Test - public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { + void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); @@ -82,12 +79,12 @@ public class DmaapMessageConsumerTest { messageConsumerUnderTest.start().join(); InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); } @Test - public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { + void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); @@ -99,89 +96,110 @@ public class DmaapMessageConsumerTest { InOrder orderVerifier = inOrder(messageConsumerUnderTest); orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); - orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test - public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - response.setActualMessages(Collections.emptyList()); + Mono> response = Mono.empty(); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); messageConsumerUnderTest.start().join(); - verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + verify(messageRouterConsumerMock).getForEntity(any()); verifyNoMoreInteractions(messageRouterConsumerMock); } @Test - public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { + void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - int responseCode = HttpStatus.BAD_REQUEST.value(); - response.setResponseCode(Integer.toString(responseCode)); - String responseMessage = "Error"; - response.setResponseMessage(responseMessage); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + Mono> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); messageConsumerUnderTest.start().join(); - assertThat(logAppender.list.toString() - .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP.")) - .isTrue(); + assertThat(logAppender.list.get(0).getFormattedMessage()) + .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test - public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + // The message from MR is here an array of Json objects setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + String message = "{\"apiVersion\":\"1.0\"," // + + "\"operation\":\"GET\"," // + + "\"correlationId\":\"1592341013115594000\"," // + + "\"originatorId\":\"849e6c6b420\"," // + + "\"payload\":{}," // + + "\"requestId\":\"23343221\", " // + + "\"target\":\"policy-agent\"," // + + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," // + + "\"type\":\"request\"," // + + "\"url\":\"/rics\"}"; + String messages = "[" + message + "]"; - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - String responseMessage = "message"; - List messages = Arrays.asList(responseMessage); - response.setActualMessages(messages); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + + Mono> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + + messageConsumerUnderTest.start().join(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(messageHandlerMock).handleDmaapMsg(captor.capture()); + String messageAfterJsonParsing = captor.getValue(); + assertThat(messageAfterJsonParsing).contains("apiVersion"); + + verifyNoMoreInteractions(messageHandlerMock); + } + + @Test + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception { + // The message from MR is here an array of String (which is the case when the MR + // simulator is used) + setUpMrConfig(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + + Mono> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); messageConsumerUnderTest.start().join(); - verify(messageHandlerMock).handleDmaapMsg(responseMessage); + verify(messageHandlerMock).handleDmaapMsg("aMessage"); verifyNoMoreInteractions(messageHandlerMock); } - private Properties setUpMrConfig() { - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); - return properties; + private void setUpMrConfig() { + when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); + when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); } }