From: elinuxhenrik Date: Tue, 5 May 2020 14:38:15 +0000 (+0200) Subject: Remove sleep from DmaapMessageConsumer X-Git-Tag: 2.0.0~50^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;ds=inline;h=9c5cb8dff7e04d10141ce2b2d1c7fab2cf8ef508;hp=-c;p=nonrtric.git Remove sleep from DmaapMessageConsumer Now that the timeout parameter is used when polling MR, the consumer doesn't have to sleep between polls. Also fixes the MR simulator so it responds correctly to the responses from the policy agent. Change-Id: I56d8be6a762013503defcd4f7297daa90a0a293e Issue-ID: NONRTRIC-209 Signed-off-by: elinuxhenrik --- 9c5cb8dff7e04d10141ce2b2d1c7fab2cf8ef508 diff --git a/policy-agent/src/main/java/org/oransc/policyagent/Application.java b/policy-agent/src/main/java/org/oransc/policyagent/Application.java index 1c397fe9..3bc7326c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/Application.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/Application.java @@ -20,6 +20,7 @@ package org.oransc.policyagent; +import org.oransc.policyagent.dmaap.DmaapMessageConsumer; import org.oransc.policyagent.tasks.RefreshConfigTask; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; @@ -34,19 +35,34 @@ public class Application { @Autowired private RefreshConfigTask configRefresh; + @Autowired + private DmaapMessageConsumer dmaapMessageConsumer; + public static void main(String[] args) { SpringApplication.run(Application.class); } /** - * Starts the service and reads the configuration. + * Starts the configuration refresh task and reads the configuration. * * @param ctx the application context. * - * @return the command line runner performing tasks at startup. + * @return the command line runner for the configuration refresh task. */ @Bean - public CommandLineRunner commandLineRunner(ApplicationContext ctx) { + public CommandLineRunner configRefreshRunner(ApplicationContext ctx) { return args -> configRefresh.start(); } + + /** + * Starts the DMaaP message consumer service. + * + * @param ctx the application context. + * + * @return the command line runner for the DMaaP message consumer service. + */ + @Bean + public CommandLineRunner dmaapMessageConsumerRunner(ApplicationContext ctx) { + return args -> dmaapMessageConsumer.start(); + } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index da209a86..dd60d394 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -33,6 +33,7 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.tasks.RefreshConfigTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -40,15 +41,21 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** - * The class fetches incoming requests from DMAAP on regular intervals. Each - * received request is proceesed by DmaapMessageHandler. + * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the + * connection with the Kafka open until requests are sent in. + * + * If there is no DMaaP configuration in the application configuration, then this service will regularly check the + * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the + * service will stop polling and resume checking for configuration. + * + * Each received request is processed by {@link DmaapMessageHandler}. */ @Component -public class DmaapMessageConsumer implements Runnable { +public class DmaapMessageConsumer { - private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); + protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); - private static final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10); + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); private final ApplicationConfig applicationConfig; @@ -58,20 +65,26 @@ public class DmaapMessageConsumer implements Runnable { @Autowired public DmaapMessageConsumer(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; + } - Thread thread = new Thread(this); + public Thread start() { + Thread thread = new Thread(() -> this.checkConfigLoop()); thread.start(); + return thread; } - private boolean isDmaapConfigured() { - Properties consumerCfg = applicationConfig.getDmaapConsumerConfig(); - Properties producerCfg = applicationConfig.getDmaapPublisherConfig(); - return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0); + private void checkConfigLoop() { + while (!isStopped()) { + if (isDmaapConfigured()) { + messageHandlingLoop(); + } else { + sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + } + } } - @Override - public void run() { - while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) { + private void messageHandlingLoop() { + while (!isStopped() && isDmaapConfigured()) { try { Iterable dmaapMsgs = fetchAllMessages(); if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { @@ -81,13 +94,23 @@ public class DmaapMessageConsumer implements Runnable { } } } catch (Exception e) { - logger.warn("{}: cannot fetch because of {}", this, e.getMessage()); - sleep(TIME_BETWEEN_DMAAP_POLLS); + logger.warn("Cannot fetch because of {}", e.getMessage()); + sleep(TIME_BETWEEN_DMAAP_RETRIES); } } } - private Iterable fetchAllMessages() throws ServiceException, IOException { + protected boolean isStopped() { + return false; + } + + protected boolean isDmaapConfigured() { + Properties consumerCfg = applicationConfig.getDmaapConsumerConfig(); + Properties producerCfg = applicationConfig.getDmaapPublisherConfig(); + return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0); + } + + protected Iterable fetchAllMessages() throws ServiceException, IOException { Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig(); MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties); MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse(); @@ -118,29 +141,27 @@ public class DmaapMessageConsumer implements Runnable { return createDmaapMessageHandler(agentClient, producer); } - boolean sleep(Duration duration) { + protected void sleep(Duration duration) { try { Thread.sleep(duration.toMillis()); - return true; } catch (Exception e) { logger.error("Failed to put the thread to sleep", e); - return false; } } - MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException { + protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException { return MRClientFactory.createConsumer(dmaapConsumerProperties); } - DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) { + protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) { return new DmaapMessageHandler(producer, agentClient); } - AsyncRestClient createRestClient(String agentBaseUrl) { + protected AsyncRestClient createRestClient(String agentBaseUrl) { return new AsyncRestClient(agentBaseUrl); } - MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException { + protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException { return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 05bcb0f2..12bc55a7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -80,6 +80,11 @@ public class RefreshConfigTask { @Value("#{systemEnvironment}") public Properties systemEnvironment; + /** + * The time between refreshes of the configuration. + */ + public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); + final ApplicationConfig appConfig; @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; @@ -90,8 +95,6 @@ public class RefreshConfigTask { private final Policies policies; private final Services services; private final PolicyTypes policyTypes; - private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); - private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); @Autowired public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services, @@ -120,14 +123,14 @@ public class RefreshConfigTask { } Flux createRefreshTask() { - Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // + Flux loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) // + Flux loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) // .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // .flatMap(this::getFromCbs) // diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index 8dd36473..fc4e7cea 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -20,29 +20,33 @@ package org.oransc.policyagent.dmaap; +import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Properties; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.onap.dmaap.mr.client.MRBatchingPublisher; @@ -50,13 +54,12 @@ import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; +import org.oransc.policyagent.tasks.RefreshConfigTask; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; @ExtendWith(MockitoExtension.class) public class DmaapMessageConsumerTest { - final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10); - @Mock private ApplicationConfig applicationConfigMock; @Mock @@ -66,98 +69,110 @@ public class DmaapMessageConsumerTest { private DmaapMessageConsumer messageConsumerUnderTest; + @AfterEach + public void resetLogging() { + LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); + } + @Test - public void dmaapNotConfigured_thenDoNothing() { + public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - messageConsumerUnderTest.run(); + messageConsumerUnderTest.start().join(); - verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS); - verify(applicationConfigMock).getDmaapConsumerConfig(); - verify(applicationConfigMock).getDmaapPublisherConfig(); - verifyNoMoreInteractions(applicationConfigMock); + InOrder orderVerifier = inOrder(messageConsumerUnderTest); + orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); } @Test - public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + messageConsumerUnderTest.start().join(); + + InOrder orderVerifier = inOrder(messageConsumerUnderTest); + orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); + orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + } + + @Test + public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { + setUpMrConfig(); + + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); MRConsumerResponse response = new MRConsumerResponse(); response.setResponseCode(Integer.toString(HttpStatus.OK.value())); response.setActualMessages(Collections.emptyList()); + doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); - messageConsumerUnderTest.run(); - - verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS); - - verify(applicationConfigMock, times(2)).getDmaapConsumerConfig(); - verify(applicationConfigMock).getDmaapPublisherConfig(); - verifyNoMoreInteractions(applicationConfigMock); + messageConsumerUnderTest.start().join(); verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); verifyNoMoreInteractions(messageRouterConsumerMock); } @Test - public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { + setUpMrConfig(); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); - - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value())); - response.setResponseMessage("Error"); + doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); + doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); - final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); + MRConsumerResponse response = new MRConsumerResponse(); + int responseCode = HttpStatus.BAD_REQUEST.value(); + response.setResponseCode(Integer.toString(responseCode)); + String responseMessage = "Error"; + response.setResponseMessage(responseMessage); + when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + + final ListAppender logAppender = + LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); - messageConsumerUnderTest.run(); + messageConsumerUnderTest.start().join(); - assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN); - assertThat( - logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP.")) + assertThat(logAppender.list.toString() + .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP.")) .isTrue(); + + verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + Properties properties = setUpMrConfig(); - doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class)); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) + .getMessageRouterConsumer(any(Properties.class)); MRConsumerResponse response = new MRConsumerResponse(); response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - List messages = Arrays.asList("message"); + String responseMessage = "message"; + List messages = Arrays.asList(responseMessage); response.setActualMessages(messages); - - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); doReturn(messageHandlerMock).when(messageConsumerUnderTest) .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class)); @@ -168,12 +183,20 @@ public class DmaapMessageConsumerTest { MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class); doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class)); - messageConsumerUnderTest.run(); + messageConsumerUnderTest.start().join(); verify(messageConsumerUnderTest).createRestClient("https://localhost:0"); verify(messageConsumerUnderTest).getMessageRouterPublisher(properties); - verify(messageHandlerMock).handleDmaapMsg("message"); + verify(messageHandlerMock).handleDmaapMsg(responseMessage); verifyNoMoreInteractions(messageHandlerMock); } + + private Properties setUpMrConfig() { + Properties properties = new Properties(); + properties.put("key", "value"); + when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); + when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); + return properties; + } } diff --git a/test/mrstub/mr.py b/test/mrstub/mr.py index 1094e7c3..9c5a2c8d 100644 --- a/test/mrstub/mr.py +++ b/test/mrstub/mr.py @@ -212,9 +212,9 @@ def events_write(): print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str)) except Exception as e: print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) - return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) - return Response('OK', status=200, mimetype=MIME_TEXT) + return Response('{}', status=200, mimetype=MIME_JSON) ### Functions for metrics read out ###