- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
- String responseMessage = "message";
- List<String> messages = Arrays.asList(responseMessage);
- response.setActualMessages(messages);
- when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+
+ messageConsumerUnderTest.start().join();
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+ String messageAfterJsonParsing = captor.getValue();
+ assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue();
+
+ verifyNoMoreInteractions(messageHandlerMock);
+ }
+
+ @Test
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+ // The message from MR is here an array of String (which is the case when the MR
+ // simulator is used)
+ setUpMrConfig();
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);