Create JUnit tests for DmaapMessageConsumer 03/2703/3
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 9 Mar 2020 08:39:49 +0000 (09:39 +0100)
committerHenrik Andersson <henrik.b.andersson@est.tech>
Mon, 9 Mar 2020 12:08:30 +0000 (12:08 +0000)
Change-Id: I987e89c52d5f9eccdcc958dbb571e5d7683d6097
Issue-ID: NONRTRIC-131
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java [new file with mode: 0644]

index 46dd2ee..e72fcf0 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap;
 
 import com.google.common.collect.Iterables;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Properties;
@@ -44,8 +45,7 @@ public class DmaapMessageConsumer implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
 
-    @SuppressWarnings("squid:S00116") // To avoid warning about DMAAP abbreviation.
-    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+    private final static Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
 
     private final ApplicationConfig applicationConfig;
 
@@ -60,6 +60,10 @@ public class DmaapMessageConsumer implements Runnable {
         thread.start();
     }
 
+    DmaapMessageConsumer(ApplicationConfig applicationConfig, boolean start) {
+        this.applicationConfig = applicationConfig;
+    }
+
     private boolean isDmaapConfigured() {
         Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
         Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
@@ -78,7 +82,7 @@ public class DmaapMessageConsumer implements Runnable {
                     }
                 }
             } catch (Exception e) {
-                logger.warn("{}: cannot fetch because of ", this, e.getMessage(), e);
+                logger.warn("{}: cannot fetch because of {}", this, e.getMessage());
                 sleep(TIME_BETWEEN_DMAAP_POLLS);
             }
         }
@@ -86,10 +90,15 @@ public class DmaapMessageConsumer implements Runnable {
 
     private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
         Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
-        MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+        MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
         MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
         if (response == null || !"200".equals(response.getResponseCode())) {
-            throw new ServiceException("DMaaP NULL response received");
+            String errorMessage = "DMaaP NULL response received";
+            if (response != null) {
+                errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage()
+                    + " from DMaaP.";
+            }
+            throw new ServiceException(errorMessage);
         } else {
             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
             return response.getActualMessages();
@@ -98,19 +107,19 @@ public class DmaapMessageConsumer implements Runnable {
 
     private void processMsg(String msg) throws IOException {
         logger.debug("Message Reveived from DMAAP : {}", msg);
-        createDmaapMessageHandler().handleDmaapMsg(msg);
+        getDmaapMessageHandler().handleDmaapMsg(msg);
     }
 
-    private DmaapMessageHandler createDmaapMessageHandler() throws IOException {
+    private DmaapMessageHandler getDmaapMessageHandler() throws IOException {
         String agentBaseUrl = "http://localhost:" + this.localServerPort;
-        AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+        AsyncRestClient agentClient = createRestClient(agentBaseUrl);
         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
-        MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+        MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties);
 
-        return new DmaapMessageHandler(producer, agentClient);
+        return createDmaapMessageHandler(agentClient, producer);
     }
 
-    private boolean sleep(Duration duration) {
+    boolean sleep(Duration duration) {
         try {
             Thread.sleep(duration.toMillis());
             return true;
@@ -119,4 +128,21 @@ public class DmaapMessageConsumer implements Runnable {
             return false;
         }
     }
+
+    MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws FileNotFoundException, IOException {
+        return MRClientFactory.createConsumer(dmaapConsumerProperties);
+    }
+
+    DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
+        return new DmaapMessageHandler(producer, agentClient);
+    }
+
+    AsyncRestClient createRestClient(String agentBaseUrl) {
+        return new AsyncRestClient(agentBaseUrl);
+    }
+
+    MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties)
+        throws FileNotFoundException, IOException {
+        return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+    }
 }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
new file mode 100644 (file)
index 0000000..cd4bcb6
--- /dev/null
@@ -0,0 +1,179 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.dmaap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.utils.LoggingUtils;
+import org.springframework.http.HttpStatus;
+
+@ExtendWith(MockitoExtension.class)
+public class DmaapMessageConsumerTest {
+    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+
+    @Mock
+    private ApplicationConfig applicationConfigMock;
+    @Mock
+    private MRConsumer messageRouterConsumerMock;
+    @Mock
+    private DmaapMessageHandler messageHandlerMock;
+
+    private DmaapMessageConsumer messageConsumerUnderTest;
+
+    @Test
+    public void dmaapNotConfigured_thenDoNothing() {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+
+        doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
+
+        messageConsumerUnderTest.run();
+
+        verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
+        verify(applicationConfigMock).getDmaapConsumerConfig();
+        verify(applicationConfigMock).getDmaapPublisherConfig();
+        verifyNoMoreInteractions(applicationConfigMock);
+    }
+
+    @Test
+    public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+
+        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+
+        Properties properties = new Properties();
+        properties.put("key", "value");
+        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
+        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+
+        MRConsumerResponse response = new MRConsumerResponse();
+        response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
+        response.setActualMessages(Collections.emptyList());
+
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
+            .getMessageRouterConsumer(any(Properties.class));
+        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+
+        messageConsumerUnderTest.run();
+
+        verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
+
+        verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
+        verify(applicationConfigMock).getDmaapPublisherConfig();
+        verifyNoMoreInteractions(applicationConfigMock);
+
+        verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        verifyNoMoreInteractions(messageRouterConsumerMock);
+    }
+
+    @Test
+    public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+
+        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+
+        Properties properties = new Properties();
+        properties.put("key", "value");
+        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
+        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+
+        MRConsumerResponse response = new MRConsumerResponse();
+        response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
+        response.setResponseMessage("Error");
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
+            .getMessageRouterConsumer(any(Properties.class));
+        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+
+        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+
+        messageConsumerUnderTest.run();
+
+        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
+        assertThat(
+            logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
+                .isTrue();
+    }
+
+    @Test
+    public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, false));
+
+        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+
+        Properties properties = new Properties();
+        properties.put("key", "value");
+        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
+        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+
+        MRConsumerResponse response = new MRConsumerResponse();
+        response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
+        List<String> messages = Arrays.asList("message");
+        response.setActualMessages(messages);
+
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
+            .getMessageRouterConsumer(any(Properties.class));
+        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest)
+            .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
+
+        AsyncRestClient restClientMock = mock(AsyncRestClient.class);
+        doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
+
+        MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
+        doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+
+        messageConsumerUnderTest.run();
+
+        verify(messageConsumerUnderTest).createRestClient("http://localhost:0");
+        verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
+
+        verify(messageHandlerMock).handleDmaapMsg("message");
+        verifyNoMoreInteractions(messageHandlerMock);
+    }
+}