+ // The message from MR is here an array of Json objects
+ setUpMrConfig();
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+ String message = "{\"apiVersion\":\"1.0\"," //
+ + "\"operation\":\"GET\"," //
+ + "\"correlationId\":\"1592341013115594000\"," //
+ + "\"originatorId\":\"849e6c6b420\"," //
+ + "\"payload\":{}," //
+ + "\"requestId\":\"23343221\", " //
+ + "\"target\":\"policy-agent\"," //
+ + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
+ + "\"type\":\"request\"," //
+ + "\"url\":\"/rics\"}";
+ String messages = "[" + message + "]";
+
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+
+ messageConsumerUnderTest.start().join();
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+ String messageAfterJsonParsing = captor.getValue();
+ assertThat(messageAfterJsonParsing).contains("apiVersion");
+
+ verifyNoMoreInteractions(messageHandlerMock);
+ }
+
+ @Test
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+ // The message from MR is here an array of String (which is the case when the MR
+ // simulator is used)