try {
DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
return this.invokePolicyAgent(dmaapRequestMessage) //
- .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
.flatMap(
-
response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
} catch (Exception e) {
logger.warn("Received unparsable message from DMAAP: {}", msg);
}
}
- private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
+ private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, String originalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
logger.debug("Agent call failed: {}", t.getMessage());
HttpStatus status = HttpStatus.NOT_FOUND;
String errorMessage = t.getMessage();
WebClientResponseException exception = (WebClientResponseException) t;
status = exception.getStatusCode();
errorMessage = exception.getResponseBodyAsString();
+ } else if (t instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
+
}
return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
.flatMap(notUsed -> Mono.empty());
}
+ private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
+ String operationParameterStart = "operation\":\"";
+ int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
+ int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
+ String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
+ return t.getMessage().replace("null", badOperation);
+ }
+
private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
-
String uri = dmaapRequestMessage.url();
+
if (operation == Operation.DELETE) {
return agentClient.deleteForEntity(uri);
} else if (operation == Operation.GET) {
}
private Mono<String> handleResponseCallError(Throwable t) {
- logger.debug("Failed to respond: {}", t.getMessage());
+ logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
return Mono.empty();
}