+ private Mono<ResponseEntity<String>> okResponse() {
+ ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
+ return Mono.just(entity);
+ }
+
+ @Test
+ public void testMessageParsing() {
+ String message = dmaapInputMessage(Operation.DELETE);
+ logger.info(message);
+ DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
+ assertTrue(parsedMessage != null);
+ assertFalse(parsedMessage.payload().isPresent());
+
+ message = dmaapInputMessage(Operation.PUT);
+ logger.info(message);
+ parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
+ assertTrue(parsedMessage != null);
+ assertTrue(parsedMessage.payload().isPresent());
+ }
+
+ @Test
+ public void unparseableMessage_thenWarning() {
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
+
+ testedObject.handleDmaapMsg("bad message");
+
+ assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
+ assertThat(logAppender.list.toString().contains("handleDmaapMsg failure ")).isTrue();
+ }
+
+ @Test
+ public void successfulDelete() throws IOException {
+ doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ String message = dmaapInputMessage(Operation.DELETE);
+
+ StepVerifier //
+ .create(testedObject.createTask(message)) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient).deleteForEntity(URL);
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void successfulGet() throws IOException {
+ doReturn(okResponse()).when(agentClient).getForEntity(anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient).getForEntity(URL);
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+