X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Ftest%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerTest.java;h=a78fde035df1377b7f994d98a5ea301ca11d8860;hb=4e7db50d7fb3fd2c7101520f00f0f0b4baf9bddc;hp=153c4ecc4dbd911170947087be46c401ea752e66;hpb=5408c157fc8aca52731fcc2cc035ed9dbfcff219;p=nonrtric.git diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index 153c4ecc..a78fde03 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -20,160 +20,178 @@ package org.oransc.policyagent.dmaap; +import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; +import java.util.LinkedList; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) -public class DmaapMessageConsumerTest { - final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10); - +class DmaapMessageConsumerTest { @Mock private ApplicationConfig applicationConfigMock; @Mock - private MRConsumer messageRouterConsumerMock; + private AsyncRestClient messageRouterConsumerMock; @Mock private DmaapMessageHandler messageHandlerMock; private DmaapMessageConsumer messageConsumerUnderTest; + @AfterEach + void resetLogging() { + LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); + } + @Test - public void dmaapNotConfigured_thenDoNothing() { + void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - messageConsumerUnderTest.run(); + messageConsumerUnderTest.start().join(); - verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS); - verify(applicationConfigMock).getDmaapConsumerConfig(); - verify(applicationConfigMock).getDmaapPublisherConfig(); - verifyNoMoreInteractions(applicationConfigMock); + InOrder orderVerifier = inOrder(messageConsumerUnderTest); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); } @Test - public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + messageConsumerUnderTest.start().join(); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - response.setActualMessages(Collections.emptyList()); + InOrder orderVerifier = inOrder(messageConsumerUnderTest); + orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + } - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + @Test + void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + setUpMrConfig(); - messageConsumerUnderTest.run(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + + Mono> response = Mono.empty(); - verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); - verify(applicationConfigMock, times(2)).getDmaapConsumerConfig(); - verify(applicationConfigMock).getDmaapPublisherConfig(); - verifyNoMoreInteractions(applicationConfigMock); + messageConsumerUnderTest.start().join(); - verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + verify(messageRouterConsumerMock).getForEntity(any()); verifyNoMoreInteractions(messageRouterConsumerMock); } @Test - public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception { + void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { + setUpMrConfig(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + Mono> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value())); - response.setResponseMessage("Error"); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + final ListAppender logAppender = + LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); - final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); + messageConsumerUnderTest.start().join(); - messageConsumerUnderTest.run(); + assertThat(logAppender.list.get(0).getFormattedMessage()) + .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); - assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN); - assertThat( - logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP.")) - .isTrue(); + verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test - public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + // The message from MR is here an array of Json objects + setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + String message = + "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}"; + String messages = "[" + message + "]"; - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - List messages = Arrays.asList("message"); - response.setActualMessages(messages); + Mono> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); - doReturn(messageHandlerMock).when(messageConsumerUnderTest) - .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class)); + messageConsumerUnderTest.start().join(); - AsyncRestClient restClientMock = mock(AsyncRestClient.class); - doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString()); + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(messageHandlerMock).handleDmaapMsg(captor.capture()); + String messageAfterJsonParsing = captor.getValue(); + assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue(); - MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class); - doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class)); + verifyNoMoreInteractions(messageHandlerMock); + } + + @Test + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception { + // The message from MR is here an array of String (which is the case when the MR + // simulator is used) + setUpMrConfig(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - messageConsumerUnderTest.run(); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - verify(messageConsumerUnderTest).createRestClient("http://localhost:0"); - verify(messageConsumerUnderTest).getMessageRouterPublisher(properties); + Mono> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); - verify(messageHandlerMock).handleDmaapMsg("message"); + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + + messageConsumerUnderTest.start().join(); + + verify(messageHandlerMock).handleDmaapMsg("aMessage"); verifyNoMoreInteractions(messageHandlerMock); } + + private void setUpMrConfig() { + when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); + when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); + } }