+ DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
+ return this.invokePolicyAgent(dmaapRequestMessage) //
+ .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
+ .flatMap(
+ response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
+ } catch (Exception e) {
+ logger.warn("Received unparsable message from DMAAP: {}", msg);
+ return Mono.error(e); // Cannot make any response
+ }
+ }
+
+ private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, String originalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", t.getMessage());
+ HttpStatus status = HttpStatus.NOT_FOUND;
+ String errorMessage = t.getMessage();
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) t;
+ status = exception.getStatusCode();
+ errorMessage = exception.getResponseBodyAsString();
+ } else if (t instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
+
+ }
+ return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
+ String operationParameterStart = "operation\":\"";
+ int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
+ int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
+ String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
+ return t.getMessage().replace("null", badOperation);
+ }
+
+ private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ String uri = dmaapRequestMessage.url();
+
+ if (operation == Operation.DELETE) {
+ return agentClient.deleteForEntity(uri);
+ } else if (operation == Operation.GET) {
+ return agentClient.getForEntity(uri);
+ } else if (operation == Operation.PUT) {
+ return agentClient.putForEntity(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ return agentClient.postForEntity(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new ServiceException("Not implemented operation: " + operation));