- // Error response
- verify(dmaapClient, times(1)).send(anyString());
- verify(dmaapClient, times(1)).sendBatchWithResponse();
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).send(captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString())).isTrue();
+
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ String badOperation = "BAD";
+ message = message.replace(Operation.PUT.toString(), badOperation);
+
+ testedObject.handleDmaapMsg(message);
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).send(captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage.contains(HttpStatus.NOT_FOUND + "\",\"message\":\"Not implemented operation:"))
+ .isTrue();
+
+ verify(dmaapClient).sendBatchWithResponse();