Remove using of DMAAP client from ONAP
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerTest.java
index cd4bcb6..a78fde0 100644 (file)
 
 package org.oransc.policyagent.dmaap;
 
+import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
+import java.util.LinkedList;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
-public class DmaapMessageConsumerTest {
-    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-
+class DmaapMessageConsumerTest {
     @Mock
     private ApplicationConfig applicationConfigMock;
     @Mock
-    private MRConsumer messageRouterConsumerMock;
+    private AsyncRestClient messageRouterConsumerMock;
     @Mock
     private DmaapMessageHandler messageHandlerMock;
 
     private DmaapMessageConsumer messageConsumerUnderTest;
 
+    @AfterEach
+    void resetLogging() {
+        LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+    }
+
     @Test
-    public void dmaapNotConfigured_thenDoNothing() {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+    void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
-        verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
-        verify(applicationConfigMock).getDmaapConsumerConfig();
-        verify(applicationConfigMock).getDmaapPublisherConfig();
-        verifyNoMoreInteractions(applicationConfigMock);
+        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
     }
 
     @Test
-    public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+    void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        messageConsumerUnderTest.start().join();
 
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+    }
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
-        response.setActualMessages(Collections.emptyList());
+    @Test
+    void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+        setUpMrConfig();
 
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        messageConsumerUnderTest.run();
+        Mono<ResponseEntity<String>> response = Mono.empty();
 
-        verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+        doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
 
-        verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
-        verify(applicationConfigMock).getDmaapPublisherConfig();
-        verifyNoMoreInteractions(applicationConfigMock);
+        messageConsumerUnderTest.start().join();
 
-        verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        verify(messageRouterConsumerMock).getForEntity(any());
         verifyNoMoreInteractions(messageRouterConsumerMock);
     }
 
     @Test
-    public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+    void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
+        setUpMrConfig();
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
-        response.setResponseMessage("Error");
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
+        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
 
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+        final ListAppender<ILoggingEvent> logAppender =
+            LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
-        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
-        assertThat(
-            logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
-                .isTrue();
+        assertThat(logAppender.list.get(0).getFormattedMessage())
+            .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
+
+        verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
     }
 
     @Test
-    public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+    void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+        // The message from MR is here an array of Json objects
+        setUpMrConfig();
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        String message =
+            "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}";
+        String messages = "[" + message + "]";
 
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
-        List<String> messages = Arrays.asList("message");
-        response.setActualMessages(messages);
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
+        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
 
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
 
-        doReturn(messageHandlerMock).when(messageConsumerUnderTest)
-            .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
+        messageConsumerUnderTest.start().join();
 
-        AsyncRestClient restClientMock = mock(AsyncRestClient.class);
-        doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
+        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+        String messageAfterJsonParsing = captor.getValue();
+        assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue();
 
-        MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
-        doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+        verifyNoMoreInteractions(messageHandlerMock);
+    }
+
+    @Test
+    void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+        // The message from MR is here an array of String (which is the case when the MR
+        // simulator is used)
+        setUpMrConfig();
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
 
-        messageConsumerUnderTest.run();
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
+        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
 
-        verify(messageConsumerUnderTest).createRestClient("http://localhost:0");
-        verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
 
-        verify(messageHandlerMock).handleDmaapMsg("message");
+        messageConsumerUnderTest.start().join();
+
+        verify(messageHandlerMock).handleDmaapMsg("aMessage");
         verifyNoMoreInteractions(messageHandlerMock);
     }
+
+    private void setUpMrConfig() {
+        when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
+        when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
+    }
 }