- JSONObject jsonObject = new JSONObject(formattedString);
- switch (dmaapMessage.getOperation()) {
- case "getPolicySchemas":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
- return policyController.getPolicySchemas(ricName);
- case "getPolicySchema":
- String policyTypeId = (String) jsonObject.get("id");
- logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
- System.out.println("policyTypeId" + policyTypeId);
- return policyController.getPolicySchema(policyTypeId);
- case "getPolicyTypes":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
- return policyController.getPolicyTypes(ricName);
- case "getPolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for getPolicy with Instance- {}", instance);
- return policyController.getPolicy(instance);
- case "deletePolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for deletePolicy with Instance- {}", instance);
- return null;// policyController.deletePolicy(deleteInstance);
- case "putPolicy":
- String type = (String) jsonObject.get("type");
- String putPolicyInstance = (String) jsonObject.get("instance");
- String putPolicyRic = (String) jsonObject.get("ric");
- String service = (String) jsonObject.get("service");
- String jsonBody = (String) jsonObject.get("jsonBody");
- return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
- case "getPolicies":
- String getPolicyType = (String) jsonObject.get("type");
- String getPolicyRic = (String) jsonObject.get("ric");
- String getPolicyService = (String) jsonObject.get("service");
- return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
- default:
- break;
+ }
+
+ private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, String originalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", t.getMessage());
+ HttpStatus status = HttpStatus.NOT_FOUND;
+ String errorMessage = t.getMessage();
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) t;
+ status = exception.getStatusCode();
+ errorMessage = exception.getResponseBodyAsString();
+ } else if (t instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
+
+ }
+ return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
+ String operationParameterStart = "operation\":\"";
+ int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
+ int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
+ String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
+ return t.getMessage().replace("null", badOperation);
+ }
+
+ private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ String uri = dmaapRequestMessage.url();
+
+ if (operation == Operation.DELETE) {
+ return agentClient.deleteForEntity(uri);
+ } else if (operation == Operation.GET) {
+ return agentClient.getForEntity(uri);
+ } else if (operation == Operation.PUT) {
+ return agentClient.putForEntity(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ return agentClient.postForEntity(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new ServiceException("Not implemented operation: " + operation));
+ }
+
+ }
+
+ private String payload(DmaapRequestMessage message) {
+ Optional<JsonObject> payload = message.payload();
+ if (payload.isPresent()) {
+ return gson.toJson(payload.get());
+ } else {
+ logger.warn("Expected payload in message from DMAAP: {}", message);
+ return "";
+ }
+ }
+
+ private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
+ HttpStatus status) {
+ return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
+ }
+
+ private Mono<String> sendToDmaap(String body) {
+ try {
+ logger.debug("sendToDmaap: {} ", body);
+ dmaapClient.send(body);
+ dmaapClient.sendBatchWithResponse();
+ return Mono.just("OK");
+ } catch (IOException e) {
+ return Mono.error(e);