Merge "Remove sleep from DmaapMessageConsumer"
authorJohn Keeney <John.Keeney@est.tech>
Fri, 8 May 2020 15:35:33 +0000 (15:35 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Fri, 8 May 2020 15:35:33 +0000 (15:35 +0000)
policy-agent/src/main/java/org/oransc/policyagent/Application.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
test/mrstub/mr.py

index 1c397fe..3bc7326 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent;
 
+import org.oransc.policyagent.dmaap.DmaapMessageConsumer;
 import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
@@ -34,19 +35,34 @@ public class Application {
     @Autowired
     private RefreshConfigTask configRefresh;
 
+    @Autowired
+    private DmaapMessageConsumer dmaapMessageConsumer;
+
     public static void main(String[] args) {
         SpringApplication.run(Application.class);
     }
 
     /**
-     * Starts the service and reads the configuration.
+     * Starts the configuration refresh task and reads the configuration.
      *
      * @param ctx the application context.
      *
-     * @return the command line runner performing tasks at startup.
+     * @return the command line runner for the configuration refresh task.
      */
     @Bean
-    public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
+    public CommandLineRunner configRefreshRunner(ApplicationContext ctx) {
         return args -> configRefresh.start();
     }
+
+    /**
+     * Starts the DMaaP message consumer service.
+     *
+     * @param ctx the application context.
+     *
+     * @return the command line runner for the DMaaP message consumer service.
+     */
+    @Bean
+    public CommandLineRunner dmaapMessageConsumerRunner(ApplicationContext ctx) {
+        return args -> dmaapMessageConsumer.start();
+    }
 }
index da209a8..dd60d39 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,15 +41,21 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 /**
- * The class fetches incoming requests from DMAAP on regular intervals. Each
- * received request is proceesed by DmaapMessageHandler.
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
+ * connection with the Kafka open until requests are sent in.
+ *
+ * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
+ * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
+ * service will stop polling and resume checking for configuration.
+ *
+ * Each received request is processed by {@link DmaapMessageHandler}.
  */
 @Component
-public class DmaapMessageConsumer implements Runnable {
+public class DmaapMessageConsumer {
 
-    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
+    protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
 
-    private static final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
 
     private final ApplicationConfig applicationConfig;
 
@@ -58,20 +65,26 @@ public class DmaapMessageConsumer implements Runnable {
     @Autowired
     public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
         this.applicationConfig = applicationConfig;
+    }
 
-        Thread thread = new Thread(this);
+    public Thread start() {
+        Thread thread = new Thread(() -> this.checkConfigLoop());
         thread.start();
+        return thread;
     }
 
-    private boolean isDmaapConfigured() {
-        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
-        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
-        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+    private void checkConfigLoop() {
+        while (!isStopped()) {
+            if (isDmaapConfigured()) {
+                messageHandlingLoop();
+            } else {
+                sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+            }
+        }
     }
 
-    @Override
-    public void run() {
-        while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
+    private void messageHandlingLoop() {
+        while (!isStopped() && isDmaapConfigured()) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
                 if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
@@ -81,13 +94,23 @@ public class DmaapMessageConsumer implements Runnable {
                     }
                 }
             } catch (Exception e) {
-                logger.warn("{}: cannot fetch because of {}", this, e.getMessage());
-                sleep(TIME_BETWEEN_DMAAP_POLLS);
+                logger.warn("Cannot fetch because of {}", e.getMessage());
+                sleep(TIME_BETWEEN_DMAAP_RETRIES);
             }
         }
     }
 
-    private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
+    protected boolean isStopped() {
+        return false;
+    }
+
+    protected boolean isDmaapConfigured() {
+        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
+        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
+        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+    }
+
+    protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
         Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
         MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
         MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
@@ -118,29 +141,27 @@ public class DmaapMessageConsumer implements Runnable {
         return createDmaapMessageHandler(agentClient, producer);
     }
 
-    boolean sleep(Duration duration) {
+    protected void sleep(Duration duration) {
         try {
             Thread.sleep(duration.toMillis());
-            return true;
         } catch (Exception e) {
             logger.error("Failed to put the thread to sleep", e);
-            return false;
         }
     }
 
-    MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
+    protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
         return MRClientFactory.createConsumer(dmaapConsumerProperties);
     }
 
-    DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
+    protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
         return new DmaapMessageHandler(producer, agentClient);
     }
 
-    AsyncRestClient createRestClient(String agentBaseUrl) {
+    protected AsyncRestClient createRestClient(String agentBaseUrl) {
         return new AsyncRestClient(agentBaseUrl);
     }
 
-    MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
+    protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
         return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
     }
 }
index 05bcb0f..12bc55a 100644 (file)
@@ -80,6 +80,11 @@ public class RefreshConfigTask {
     @Value("#{systemEnvironment}")
     public Properties systemEnvironment;
 
+    /**
+     * The time between refreshes of the configuration.
+     */
+    public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+
     final ApplicationConfig appConfig;
     @Getter(AccessLevel.PROTECTED)
     private Disposable refreshTask = null;
@@ -90,8 +95,6 @@ public class RefreshConfigTask {
     private final Policies policies;
     private final Services services;
     private final PolicyTypes policyTypes;
-    private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
-    private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     @Autowired
     public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
@@ -120,14 +123,14 @@ public class RefreshConfigTask {
     }
 
     Flux<RicConfigUpdate.Type> createRefreshTask() {
-        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
+        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
             .filter(notUsed -> !this.isConsulUsed) //
             .flatMap(notUsed -> loadConfigurationFromFile()) //
             .onErrorResume(this::ignoreErrorFlux) //
             .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
             .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
 
-        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) //
+        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
             .flatMap(i -> getEnvironment(systemEnvironment)) //
             .flatMap(this::createCbsClient) //
             .flatMap(this::getFromCbs) //
index 8dd3647..fc4e7ce 100644 (file)
 
 package org.oransc.policyagent.dmaap;
 
+import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dmaap.mr.client.MRBatchingPublisher;
@@ -50,13 +54,12 @@ import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.oransc.policyagent.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
 
 @ExtendWith(MockitoExtension.class)
 public class DmaapMessageConsumerTest {
-    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-
     @Mock
     private ApplicationConfig applicationConfigMock;
     @Mock
@@ -66,98 +69,110 @@ public class DmaapMessageConsumerTest {
 
     private DmaapMessageConsumer messageConsumerUnderTest;
 
+    @AfterEach
+    public void resetLogging() {
+        LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+    }
+
     @Test
-    public void dmaapNotConfigured_thenDoNothing() {
+    public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
-        verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
-        verify(applicationConfigMock).getDmaapConsumerConfig();
-        verify(applicationConfigMock).getDmaapPublisherConfig();
-        verifyNoMoreInteractions(applicationConfigMock);
+        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
     }
 
     @Test
-    public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+    public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
 
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        messageConsumerUnderTest.start().join();
+
+        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+    }
+
+    @Test
+    public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+        setUpMrConfig();
+
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
         MRConsumerResponse response = new MRConsumerResponse();
         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
         response.setActualMessages(Collections.emptyList());
 
+        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
-        messageConsumerUnderTest.run();
-
-        verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
-
-        verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
-        verify(applicationConfigMock).getDmaapPublisherConfig();
-        verifyNoMoreInteractions(applicationConfigMock);
+        messageConsumerUnderTest.start().join();
 
         verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
         verifyNoMoreInteractions(messageRouterConsumerMock);
     }
 
     @Test
-    public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+    public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
+        setUpMrConfig();
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
-
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
-        response.setResponseMessage("Error");
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
 
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+        MRConsumerResponse response = new MRConsumerResponse();
+        int responseCode = HttpStatus.BAD_REQUEST.value();
+        response.setResponseCode(Integer.toString(responseCode));
+        String responseMessage = "Error";
+        response.setResponseMessage(responseMessage);
+        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+
+        final ListAppender<ILoggingEvent> logAppender =
+            LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
-        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
-        assertThat(
-            logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
+        assertThat(logAppender.list.toString()
+            .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."))
                 .isTrue();
+
+        verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
     }
 
     @Test
     public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+        Properties properties = setUpMrConfig();
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
+            .getMessageRouterConsumer(any(Properties.class));
 
         MRConsumerResponse response = new MRConsumerResponse();
         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
-        List<String> messages = Arrays.asList("message");
+        String responseMessage = "message";
+        List<String> messages = Arrays.asList(responseMessage);
         response.setActualMessages(messages);
-
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
         doReturn(messageHandlerMock).when(messageConsumerUnderTest)
             .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
@@ -168,12 +183,20 @@ public class DmaapMessageConsumerTest {
         MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
         doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
         verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
         verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
 
-        verify(messageHandlerMock).handleDmaapMsg("message");
+        verify(messageHandlerMock).handleDmaapMsg(responseMessage);
         verifyNoMoreInteractions(messageHandlerMock);
     }
+
+    private Properties setUpMrConfig() {
+        Properties properties = new Properties();
+        properties.put("key", "value");
+        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
+        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        return properties;
+    }
 }
index 1094e7c..9c5a2c8 100644 (file)
@@ -212,9 +212,9 @@ def events_write():
             print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
     except Exception as e:
         print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
-        return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
+        return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
 
-    return Response('OK', status=200, mimetype=MIME_TEXT)
+    return Response('{}', status=200, mimetype=MIME_JSON)
 
 
 ### Functions for metrics read out ###