X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=policy-agent%2Fsrc%2Ftest%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerTest.java;h=a78fde035df1377b7f994d98a5ea301ca11d8860;hb=4e7db50d7fb3fd2c7101520f00f0f0b4baf9bddc;hp=b9696ade88160d38298e4c342b842c1178b7aca6;hpb=083393d0affc7dca6a5cea89f4f9759801a91591;p=nonrtric.git diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index b9696ade..a78fde03 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -35,43 +35,40 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; +import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.tasks.RefreshConfigTask; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) -public class DmaapMessageConsumerTest { +class DmaapMessageConsumerTest { @Mock private ApplicationConfig applicationConfigMock; @Mock - private MRConsumer messageRouterConsumerMock; + private AsyncRestClient messageRouterConsumerMock; @Mock private DmaapMessageHandler messageHandlerMock; private DmaapMessageConsumer messageConsumerUnderTest; @AfterEach - public void resetLogging() { + void resetLogging() { LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); } @Test - public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { + void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); @@ -82,12 +79,12 @@ public class DmaapMessageConsumerTest { messageConsumerUnderTest.start().join(); InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); } @Test - public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { + void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); @@ -99,89 +96,102 @@ public class DmaapMessageConsumerTest { InOrder orderVerifier = inOrder(messageConsumerUnderTest); orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); - orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test - public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - response.setActualMessages(Collections.emptyList()); + Mono> response = Mono.empty(); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); messageConsumerUnderTest.start().join(); - verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + verify(messageRouterConsumerMock).getForEntity(any()); verifyNoMoreInteractions(messageRouterConsumerMock); } @Test - public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { + void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - int responseCode = HttpStatus.BAD_REQUEST.value(); - response.setResponseCode(Integer.toString(responseCode)); - String responseMessage = "Error"; - response.setResponseMessage(responseMessage); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + Mono> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); messageConsumerUnderTest.start().join(); - assertThat(logAppender.list.toString() - .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP.")) - .isTrue(); + assertThat(logAppender.list.get(0).getFormattedMessage()) + .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test - public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + // The message from MR is here an array of Json objects setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + String message = + "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}"; + String messages = "[" + message + "]"; - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - String responseMessage = "message"; - List messages = Arrays.asList(responseMessage); - response.setActualMessages(messages); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + + Mono> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + + messageConsumerUnderTest.start().join(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(messageHandlerMock).handleDmaapMsg(captor.capture()); + String messageAfterJsonParsing = captor.getValue(); + assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue(); + + verifyNoMoreInteractions(messageHandlerMock); + } + + @Test + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception { + // The message from MR is here an array of String (which is the case when the MR + // simulator is used) + setUpMrConfig(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + + Mono> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); messageConsumerUnderTest.start().join(); - verify(messageHandlerMock).handleDmaapMsg(responseMessage); + verify(messageHandlerMock).handleDmaapMsg("aMessage"); verifyNoMoreInteractions(messageHandlerMock); } - private Properties setUpMrConfig() { - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); - return properties; + private void setUpMrConfig() { + when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); + when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); } }