import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * The class fetches incoming requests from DMAAP on regular intervals. Each
- * received request is proceesed by DmaapMessageHandler.
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
+ * connection with the Kafka open until requests are sent in.
+ *
+ * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
+ * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
+ * service will stop polling and resume checking for configuration.
+ *
+ * Each received request is processed by {@link DmaapMessageHandler}.
*/
@Component
-public class DmaapMessageConsumer implements Runnable {
+public class DmaapMessageConsumer {
- private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
+ protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
- private static final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
private final ApplicationConfig applicationConfig;
@Autowired
public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
+ }
- Thread thread = new Thread(this);
+ public Thread start() {
+ Thread thread = new Thread(() -> this.checkConfigLoop());
thread.start();
+ return thread;
}
- private boolean isDmaapConfigured() {
- Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
- Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
- return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+ private void checkConfigLoop() {
+ while (!isStopped()) {
+ if (isDmaapConfigured()) {
+ messageHandlingLoop();
+ } else {
+ sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ }
+ }
}
- @Override
- public void run() {
- while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
+ private void messageHandlingLoop() {
+ while (!isStopped() && isDmaapConfigured()) {
try {
Iterable<String> dmaapMsgs = fetchAllMessages();
if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
}
}
} catch (Exception e) {
- logger.warn("{}: cannot fetch because of {}", this, e.getMessage());
- sleep(TIME_BETWEEN_DMAAP_POLLS);
+ logger.warn("Cannot fetch because of {}", e.getMessage());
+ sleep(TIME_BETWEEN_DMAAP_RETRIES);
}
}
}
- private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
+ protected boolean isStopped() {
+ return false;
+ }
+
+ protected boolean isDmaapConfigured() {
+ Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
+ Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
+ return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+ }
+
+ protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
return createDmaapMessageHandler(agentClient, producer);
}
- boolean sleep(Duration duration) {
+ protected void sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
- return true;
} catch (Exception e) {
logger.error("Failed to put the thread to sleep", e);
- return false;
}
}
- MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
+ protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
return MRClientFactory.createConsumer(dmaapConsumerProperties);
}
- DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
+ protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
return new DmaapMessageHandler(producer, agentClient);
}
- AsyncRestClient createRestClient(String agentBaseUrl) {
+ protected AsyncRestClient createRestClient(String agentBaseUrl) {
return new AsyncRestClient(agentBaseUrl);
}
- MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
+ protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
}
}
package org.oransc.policyagent.dmaap;
+import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.oransc.policyagent.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
@ExtendWith(MockitoExtension.class)
public class DmaapMessageConsumerTest {
- final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-
@Mock
private ApplicationConfig applicationConfigMock;
@Mock
private DmaapMessageConsumer messageConsumerUnderTest;
+ @AfterEach
+ public void resetLogging() {
+ LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+ }
+
@Test
- public void dmaapNotConfigured_thenDoNothing() {
+ public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
+ doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
- messageConsumerUnderTest.run();
+ messageConsumerUnderTest.start().join();
- verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
- verify(applicationConfigMock).getDmaapConsumerConfig();
- verify(applicationConfigMock).getDmaapPublisherConfig();
- verifyNoMoreInteractions(applicationConfigMock);
+ InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+ orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
}
@Test
- public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+ public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
+ doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ messageConsumerUnderTest.start().join();
+
+ InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+ orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+ orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ }
+
+ @Test
+ public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+ setUpMrConfig();
+
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
MRConsumerResponse response = new MRConsumerResponse();
response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
response.setActualMessages(Collections.emptyList());
+ doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
- doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
- messageConsumerUnderTest.run();
-
- verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
-
- verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
- verify(applicationConfigMock).getDmaapPublisherConfig();
- verifyNoMoreInteractions(applicationConfigMock);
+ messageConsumerUnderTest.start().join();
verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
verifyNoMoreInteractions(messageRouterConsumerMock);
}
@Test
- public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+ public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
+ setUpMrConfig();
- doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
-
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
- response.setResponseMessage("Error");
+ doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
- doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+ MRConsumerResponse response = new MRConsumerResponse();
+ int responseCode = HttpStatus.BAD_REQUEST.value();
+ response.setResponseCode(Integer.toString(responseCode));
+ String responseMessage = "Error";
+ response.setResponseMessage(responseMessage);
+ when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+
+ final ListAppender<ILoggingEvent> logAppender =
+ LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
- messageConsumerUnderTest.run();
+ messageConsumerUnderTest.start().join();
- assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
- assertThat(
- logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
+ assertThat(logAppender.list.toString()
+ .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."))
.isTrue();
+
+ verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
}
@Test
public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+ Properties properties = setUpMrConfig();
- doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
+ .getMessageRouterConsumer(any(Properties.class));
MRConsumerResponse response = new MRConsumerResponse();
response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
- List<String> messages = Arrays.asList("message");
+ String responseMessage = "message";
+ List<String> messages = Arrays.asList(responseMessage);
response.setActualMessages(messages);
-
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
- doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
doReturn(messageHandlerMock).when(messageConsumerUnderTest)
.createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
- messageConsumerUnderTest.run();
+ messageConsumerUnderTest.start().join();
verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
- verify(messageHandlerMock).handleDmaapMsg("message");
+ verify(messageHandlerMock).handleDmaapMsg(responseMessage);
verifyNoMoreInteractions(messageHandlerMock);
}
+
+ private Properties setUpMrConfig() {
+ Properties properties = new Properties();
+ properties.put("key", "value");
+ when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
+ when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ return properties;
+ }
}