- private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
- String formattedString = "";
- String ricName;
- String instance;
- logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
- try {
- formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
- logger.debug("Removed the Escape charater in payload- {}", formattedString);
- } catch (JSONException e) {
- logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+ private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, String originalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", t.getMessage());
+ HttpStatus status = HttpStatus.NOT_FOUND;
+ String errorMessage = t.getMessage();
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) t;
+ status = exception.getStatusCode();
+ errorMessage = exception.getResponseBodyAsString();
+ } else if (t instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
+
+ }
+ return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
+ String operationParameterStart = "operation\":\"";
+ int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
+ int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
+ String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
+ return t.getMessage().replace("null", badOperation);
+ }
+
+ private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ String uri = dmaapRequestMessage.url();
+
+ if (operation == Operation.DELETE) {
+ return agentClient.deleteForEntity(uri);
+ } else if (operation == Operation.GET) {
+ return agentClient.getForEntity(uri);
+ } else if (operation == Operation.PUT) {
+ return agentClient.putForEntity(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ return agentClient.postForEntity(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new ServiceException("Not implemented operation: " + operation));
+ }
+
+ }
+
+ private String payload(DmaapRequestMessage message) {
+ Optional<JsonObject> payload = message.payload();
+ if (payload.isPresent()) {
+ return gson.toJson(payload.get());
+ } else {
+ logger.warn("Expected payload in message from DMAAP: {}", message);
+ return "";