X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Ftest%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerTest.java;h=a78fde035df1377b7f994d98a5ea301ca11d8860;hb=4e7db50d7fb3fd2c7101520f00f0f0b4baf9bddc;hp=6e7865689d11304c59afdc0462e31ea50a7f33a3;hpb=824033853e6b3b52f9019283e85f21006596b0fb;p=nonrtric.git diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index 6e786568..a78fde03 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -35,30 +35,28 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; +import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) class DmaapMessageConsumerTest { @Mock private ApplicationConfig applicationConfigMock; @Mock - private MRConsumer messageRouterConsumerMock; + private AsyncRestClient messageRouterConsumerMock; @Mock private DmaapMessageHandler messageHandlerMock; @@ -107,18 +105,15 @@ class DmaapMessageConsumerTest { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - response.setActualMessages(Collections.emptyList()); + Mono> response = Mono.empty(); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); messageConsumerUnderTest.start().join(); - verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + verify(messageRouterConsumerMock).getForEntity(any()); verifyNoMoreInteractions(messageRouterConsumerMock); } @@ -130,56 +125,73 @@ class DmaapMessageConsumerTest { doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - int responseCode = HttpStatus.BAD_REQUEST.value(); - response.setResponseCode(Integer.toString(responseCode)); - String responseMessage = "Error"; - response.setResponseMessage(responseMessage); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + Mono> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); messageConsumerUnderTest.start().join(); - assertThat(logAppender.list.get(0).getFormattedMessage()).isEqualTo( - "Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."); + assertThat(logAppender.list.get(0).getFormattedMessage()) + .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + // The message from MR is here an array of Json objects + setUpMrConfig(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + + String message = + "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}"; + String messages = "[" + message + "]"; + + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + + Mono> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + + messageConsumerUnderTest.start().join(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(messageHandlerMock).handleDmaapMsg(captor.capture()); + String messageAfterJsonParsing = captor.getValue(); + assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue(); + + verifyNoMoreInteractions(messageHandlerMock); + } + + @Test + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception { + // The message from MR is here an array of String (which is the case when the MR + // simulator is used) setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - String responseMessage = "message"; - List messages = Arrays.asList(responseMessage); - response.setActualMessages(messages); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + Mono> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); messageConsumerUnderTest.start().join(); - verify(messageHandlerMock).handleDmaapMsg(responseMessage); + verify(messageHandlerMock).handleDmaapMsg("aMessage"); verifyNoMoreInteractions(messageHandlerMock); } - private Properties setUpMrConfig() { - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); - return properties; + private void setUpMrConfig() { + when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); + when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); } }