+ @Test
+ public void successfulPost() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).post(anyString(), anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient).post(URL, payloadAsString());
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
+ String errorCause = "Refused";
+ doReturn(Mono.error(new Exception(errorCause))).when(agentClient).put(anyString(), any());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+ .expectSubscription() //
+ .verifyComplete(); //
+
+ verify(agentClient).put(anyString(), anyString());
+ verifyNoMoreInteractions(agentClient);
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).send(captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage.contains(HttpStatus.NOT_FOUND + "\",\"message\":\"java.lang.Exception: " + errorCause))
+ .isTrue();
+
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ String badOperation = "BAD";
+ message = message.replace(Operation.PUT.toString(), badOperation);
+
+ testedObject.handleDmaapMsg(message);
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).send(captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage
+ .contains(HttpStatus.NOT_FOUND + "\",\"message\":\"Not implemented operation: " + badOperation)).isTrue();
+
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
+
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
+
+ testedObject.handleDmaapMsg(message);
+
+ assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
+ assertThat(logAppender.list.toString().contains("Expected payload in message from DMAAP: ")).isTrue();
+ }