- verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
- verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+ String messageAfterJsonParsing = captor.getValue();
+ assertThat(messageAfterJsonParsing).contains("apiVersion");
+
+ verifyNoMoreInteractions(messageHandlerMock);
+ }
+
+ @Test
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+ // The message from MR is here an array of String (which is the case when the MR
+ // simulator is used)
+ setUpMrConfig();
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+
+ messageConsumerUnderTest.start().join();