public void handleDmaapMsg(String msg) {
this.createTask(msg) //
.subscribe(message -> logger.debug("handleDmaapMsg: {}", message), //
- throwable -> logger.warn("handleDmaapMsg failure ", throwable), //
+ throwable -> logger.warn("handleDmaapMsg failure {}", throwable.getMessage()), //
() -> logger.debug("handleDmaapMsg complete"));
}
try {
DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
return this.invokePolicyAgent(dmaapRequestMessage) //
- .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
.flatMap(
-
response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
} catch (Exception e) {
logger.warn("Received unparsable message from DMAAP: {}", msg);
}
}
- private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
+ private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, String originalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
logger.debug("Agent call failed: {}", t.getMessage());
HttpStatus status = HttpStatus.NOT_FOUND;
String errorMessage = t.getMessage();
WebClientResponseException exception = (WebClientResponseException) t;
status = exception.getStatusCode();
errorMessage = exception.getResponseBodyAsString();
+ } else if (t instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
+
}
return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
.flatMap(notUsed -> Mono.empty());
}
+ private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
+ String operationParameterStart = "operation\":\"";
+ int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
+ int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
+ String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
+ return t.getMessage().replace("null", badOperation);
+ }
+
private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
-
String uri = dmaapRequestMessage.url();
+
if (operation == Operation.DELETE) {
return agentClient.deleteForEntity(uri);
} else if (operation == Operation.GET) {
}
private Mono<String> handleResponseCallError(Throwable t) {
- logger.debug("Failed to respond: {}", t.getMessage());
+ logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
return Mono.empty();
}
HttpStatus status) {
DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
.status(status.toString()) //
- .message(response) //
+ .message(response == null ? "" : response) //
.type("response") //
- .correlationId(dmaapRequestMessage.correlationId()) //
- .originatorId(dmaapRequestMessage.originatorId()) //
- .requestId(dmaapRequestMessage.requestId()) //
- .timestamp(dmaapRequestMessage.timestamp()) //
+ .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
.build();
String str = gson.toJson(dmaapResponseMessage);
return Mono.just(str);