package org.oransc.policyagent.dmaap;
+import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
+import java.util.LinkedList;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
-public class DmaapMessageConsumerTest {
- final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-
+class DmaapMessageConsumerTest {
@Mock
private ApplicationConfig applicationConfigMock;
@Mock
- private MRConsumer messageRouterConsumerMock;
+ private AsyncRestClient messageRouterConsumerMock;
@Mock
private DmaapMessageHandler messageHandlerMock;
private DmaapMessageConsumer messageConsumerUnderTest;
+ @AfterEach
+ void resetLogging() {
+ LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+ }
+
@Test
- public void dmaapNotConfigured_thenDoNothing() {
+ void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
+ doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
- messageConsumerUnderTest.run();
+ messageConsumerUnderTest.start().join();
- verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
- verify(applicationConfigMock).getDmaapConsumerConfig();
- verify(applicationConfigMock).getDmaapPublisherConfig();
- verifyNoMoreInteractions(applicationConfigMock);
+ InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+ orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+ orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
}
@Test
- public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+ void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
+ doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ messageConsumerUnderTest.start().join();
- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
- response.setActualMessages(Collections.emptyList());
+ InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+ orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+ orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+ }
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
- doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ @Test
+ void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+ setUpMrConfig();
- messageConsumerUnderTest.run();
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+ Mono<ResponseEntity<String>> response = Mono.empty();
- verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+ doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
- verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
- verify(applicationConfigMock).getDmaapPublisherConfig();
- verifyNoMoreInteractions(applicationConfigMock);
+ messageConsumerUnderTest.start().join();
- verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ verify(messageRouterConsumerMock).getForEntity(any());
verifyNoMoreInteractions(messageRouterConsumerMock);
}
@Test
- public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
+ void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
+ setUpMrConfig();
+
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
- response.setResponseMessage("Error");
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
- doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ final ListAppender<ILoggingEvent> logAppender =
+ LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+ messageConsumerUnderTest.start().join();
- messageConsumerUnderTest.run();
+ assertThat(logAppender.list.get(0).getFormattedMessage())
+ .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
- assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
- assertThat(
- logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
- .isTrue();
+ verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
}
@Test
- public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+ // The message from MR is here an array of Json objects
+ setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
+ String message = "{\"apiVersion\":\"1.0\"," //
+ + "\"operation\":\"GET\"," //
+ + "\"correlationId\":\"1592341013115594000\"," //
+ + "\"originatorId\":\"849e6c6b420\"," //
+ + "\"payload\":{}," //
+ + "\"requestId\":\"23343221\", " //
+ + "\"target\":\"policy-agent\"," //
+ + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
+ + "\"type\":\"request\"," //
+ + "\"url\":\"/rics\"}";
+ String messages = "[" + message + "]";
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
- List<String> messages = Arrays.asList("message");
- response.setActualMessages(messages);
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
- doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
- doReturn(messageHandlerMock).when(messageConsumerUnderTest)
- .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
+ messageConsumerUnderTest.start().join();
- AsyncRestClient restClientMock = mock(AsyncRestClient.class);
- doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+ String messageAfterJsonParsing = captor.getValue();
+ assertThat(messageAfterJsonParsing).contains("apiVersion");
- MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
- doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+ verifyNoMoreInteractions(messageHandlerMock);
+ }
+
+ @Test
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+ // The message from MR is here an array of String (which is the case when the MR
+ // simulator is used)
+ setUpMrConfig();
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- messageConsumerUnderTest.run();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
- verify(messageConsumerUnderTest).createRestClient("http://localhost:0");
- verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
- verify(messageHandlerMock).handleDmaapMsg("message");
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+
+ messageConsumerUnderTest.start().join();
+
+ verify(messageHandlerMock).handleDmaapMsg("aMessage");
verifyNoMoreInteractions(messageHandlerMock);
}
+
+ private void setUpMrConfig() {
+ when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
+ when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
+ }
}