2 * ========================LICENSE_START=================================
5 * Copyright (C) 2020 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.dmaap;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.mockito.Mockito.when;
34 import ch.qos.logback.classic.Level;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
38 import java.time.Duration;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Properties;
44 import org.junit.jupiter.api.Test;
45 import org.junit.jupiter.api.extension.ExtendWith;
46 import org.mockito.Mock;
47 import org.mockito.junit.jupiter.MockitoExtension;
48 import org.onap.dmaap.mr.client.MRBatchingPublisher;
49 import org.onap.dmaap.mr.client.MRConsumer;
50 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
51 import org.oransc.policyagent.clients.AsyncRestClient;
52 import org.oransc.policyagent.configuration.ApplicationConfig;
53 import org.oransc.policyagent.utils.LoggingUtils;
54 import org.springframework.http.HttpStatus;
56 @ExtendWith(MockitoExtension.class)
57 public class DmaapMessageConsumerTest {
58 final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
61 private ApplicationConfig applicationConfigMock;
63 private MRConsumer messageRouterConsumerMock;
65 private DmaapMessageHandler messageHandlerMock;
67 private DmaapMessageConsumer messageConsumerUnderTest;
70 public void dmaapNotConfigured_thenDoNothing() {
71 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
73 doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
75 messageConsumerUnderTest.run();
77 verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
78 verify(applicationConfigMock).getDmaapConsumerConfig();
79 verify(applicationConfigMock).getDmaapPublisherConfig();
80 verifyNoMoreInteractions(applicationConfigMock);
84 public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
85 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
87 doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
89 Properties properties = new Properties();
90 properties.put("key", "value");
91 when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
92 when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
94 MRConsumerResponse response = new MRConsumerResponse();
95 response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
96 response.setActualMessages(Collections.emptyList());
98 doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
99 .getMessageRouterConsumer(any(Properties.class));
100 doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
102 messageConsumerUnderTest.run();
104 verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
106 verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
107 verify(applicationConfigMock).getDmaapPublisherConfig();
108 verifyNoMoreInteractions(applicationConfigMock);
110 verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
111 verifyNoMoreInteractions(messageRouterConsumerMock);
115 public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
116 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
118 doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
120 Properties properties = new Properties();
121 properties.put("key", "value");
122 when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
123 when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
125 MRConsumerResponse response = new MRConsumerResponse();
126 response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
127 response.setResponseMessage("Error");
128 doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
129 .getMessageRouterConsumer(any(Properties.class));
130 doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
132 final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
134 messageConsumerUnderTest.run();
136 assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
138 logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
143 public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
144 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
146 doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
148 Properties properties = new Properties();
149 properties.put("key", "value");
150 when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
151 when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
153 MRConsumerResponse response = new MRConsumerResponse();
154 response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
155 List<String> messages = Arrays.asList("message");
156 response.setActualMessages(messages);
158 doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
159 .getMessageRouterConsumer(any(Properties.class));
160 doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
162 doReturn(messageHandlerMock).when(messageConsumerUnderTest)
163 .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
165 AsyncRestClient restClientMock = mock(AsyncRestClient.class);
166 doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
168 MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
169 doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
171 messageConsumerUnderTest.run();
173 verify(messageConsumerUnderTest).createRestClient("http://localhost:0");
174 verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
176 verify(messageHandlerMock).handleDmaapMsg("message");
177 verifyNoMoreInteractions(messageHandlerMock);