+ @Test
+ void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
+
+ doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString());
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+
+ testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
+
+ verify(agentClient).putForEntity(anyString(), anyString());
+ verifyNoMoreInteractions(agentClient);
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).post(anyString(), captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
+ .contains(HttpStatus.BAD_GATEWAY.toString());
+
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ String badOperation = "BAD";
+ message = message.replace(Operation.PUT.toString(), badOperation);
+
+ testedObject.handleDmaapMsg(message);
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).post(anyString(), captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage).contains("Not implemented operation") //
+ .contains("BAD_REQUEST");
+ }
+
+ @Test
+ void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
+
+ final ListAppender<ILoggingEvent> logAppender =
+ LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
+
+ testedObject.handleDmaapMsg(message);
+
+ assertThat(logAppender.list.get(0).getFormattedMessage())
+ .startsWith("Expected payload in message from DMAAP: ");
+ }