X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageHandler.java;h=fc6e439a3884e8d31778f82fee98bdb6f4883b14;hb=2350dd8586d0084d30b953fd9cdd8e14fec3856f;hp=bf9f06c1ec5343d5b66cae094714d35274b17f1e;hpb=f3461cb776023b950d62edd25eca148b6d354c9c;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index bf9f06c1..fc6e439a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2019 Nordix Foundation + * Copyright (C) 2020 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,159 +17,154 @@ * limitations under the License. * ========================LICENSE_END=================================== */ - package org.oransc.policyagent.dmaap; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; + import java.io.IOException; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; +import java.util.Optional; + +import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.oransc.policyagent.clients.AsyncRestClient; -import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.controllers.PolicyController; -import org.oransc.policyagent.model.DmaapMessage; +import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation; +import org.oransc.policyagent.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; -@Component +/** + * The class handles incoming requests from DMAAP. + *

+ * That means: invoke a REST call towards this services and to send back a + * response though DMAAP + */ public class DmaapMessageHandler { - private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); + private static Gson gson = new GsonBuilder() // + .create(); // + private final MRBatchingPublisher dmaapClient; + private final AsyncRestClient agentClient; - @Autowired - private ObjectMapper mapper; - @Autowired - private PolicyController policyController; - private AsyncRestClient restClient; - private ApplicationConfig applicationConfig; - private String topic = ""; - - @Autowired - public DmaapMessageHandler(ApplicationConfig applicationConfig) { - this.applicationConfig = applicationConfig; + public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) { + this.agentClient = agentClient; + this.dmaapClient = dmaapClient; } - // The publish properties is corrupted. It contains the subscribe property values. - @Async("threadPoolTaskExecutor") public void handleDmaapMsg(String msg) { - init(); - DmaapMessage dmaapMessage = null; - ResponseEntity response = null; - // Process the message - /** - * Sample Request Message from DMAAP { "type": "request", "correlationId": - * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", - * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", - * "payload": "{\"ric\":\"ric1\"}" } - * -------------------------------------------------------------------------------------------------------------- - * Sample Response Message to DMAAP { "type": "response", "correlation-id": - * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": - * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" } - * ------------------------------------------------------------------------------------------------------------- - * Sample Response Message to DMAAP { "type": "response", "correlation-id": - * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": - * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" } - */ + this.createTask(msg) // + .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), // + throwable -> logger.warn("handleDmaapMsg failure ", throwable), // + () -> logger.debug("handleDmaapMsg complete")); + } + + Mono createTask(String msg) { try { - dmaapMessage = mapper.readValue(msg, DmaapMessage.class); - // Post the accepted message to the DMAAP bus - logger.debug("DMAAP Message- {}", dmaapMessage); - logger.debug("Post Accepted Message to Client"); - restClient - .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), - dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY)) - .block(); // - // Call the Controller - logger.debug("Invoke the Policy Agent Controller"); - response = invokeController(dmaapMessage); - // Post the Response message to the DMAAP bus - logger.debug("DMAAP Response Message to Client- {}", response); - restClient - .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), - dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody())) - .block(); // - } catch (IOException e) { - logger.error("Exception occured during message processing", e); + DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class); + return this.invokePolicyAgent(dmaapRequestMessage) // + .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) // + .flatMap( + response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode())); + } catch (Exception e) { + logger.warn("Received unparsable message from DMAAP: {}", msg); + return Mono.error(e); // Cannot make any response } } - private ResponseEntity invokeController(DmaapMessage dmaapMessage) { - String formattedString = ""; - String ricName; - String instance; - logger.debug("Payload from the Message - {}", dmaapMessage.getPayload()); - try { - formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString(); - logger.debug("Removed the Escape charater in payload- {}", formattedString); - } catch (JSONException e) { - logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload()); + private Mono> handleAgentCallError(Throwable t, String originalMessage, + DmaapRequestMessage dmaapRequestMessage) { + logger.debug("Agent call failed: {}", t.getMessage()); + HttpStatus status = HttpStatus.NOT_FOUND; + String errorMessage = t.getMessage(); + if (t instanceof WebClientResponseException) { + WebClientResponseException exception = (WebClientResponseException) t; + status = exception.getStatusCode(); + errorMessage = exception.getResponseBodyAsString(); + } else if (t instanceof ServiceException) { + status = HttpStatus.BAD_REQUEST; + errorMessage = prepareBadOperationErrorMessage(t, originalMessage); + + } + return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) // + .flatMap(notUsed -> Mono.empty()); + } + + private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) { + String operationParameterStart = "operation\":\""; + int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length(); + int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart); + String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd); + return t.getMessage().replace("null", badOperation); + } + + private Mono> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) { + DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation(); + String uri = dmaapRequestMessage.url(); + + if (operation == Operation.DELETE) { + return agentClient.deleteForEntity(uri); + } else if (operation == Operation.GET) { + return agentClient.getForEntity(uri); + } else if (operation == Operation.PUT) { + return agentClient.putForEntity(uri, payload(dmaapRequestMessage)); + } else if (operation == Operation.POST) { + return agentClient.postForEntity(uri, payload(dmaapRequestMessage)); + } else { + return Mono.error(new ServiceException("Not implemented operation: " + operation)); + } + + } + + private String payload(DmaapRequestMessage message) { + Optional payload = message.payload(); + if (payload.isPresent()) { + return gson.toJson(payload.get()); + } else { + logger.warn("Expected payload in message from DMAAP: {}", message); + return ""; } - JSONObject jsonObject = new JSONObject(formattedString); - switch (dmaapMessage.getOperation()) { - case "getPolicySchemas": - ricName = (String) jsonObject.get("ricName"); - logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); - return policyController.getPolicySchemas(ricName); - case "getPolicySchema": - String policyTypeId = (String) jsonObject.get("id"); - logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); - System.out.println("policyTypeId" + policyTypeId); - return policyController.getPolicySchema(policyTypeId); - case "getPolicyTypes": - ricName = (String) jsonObject.get("ricName"); - logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); - return policyController.getPolicyTypes(ricName); - case "getPolicy": - instance = (String) jsonObject.get("instance"); - logger.debug("Received the request for getPolicy with Instance- {}", instance); - return policyController.getPolicy(instance); - case "deletePolicy": - instance = (String) jsonObject.get("instance"); - logger.debug("Received the request for deletePolicy with Instance- {}", instance); - return null;// policyController.deletePolicy(deleteInstance); - case "putPolicy": - String type = (String) jsonObject.get("type"); - String putPolicyInstance = (String) jsonObject.get("instance"); - String putPolicyRic = (String) jsonObject.get("ric"); - String service = (String) jsonObject.get("service"); - String jsonBody = (String) jsonObject.get("jsonBody"); - return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody); - case "getPolicies": - String getPolicyType = (String) jsonObject.get("type"); - String getPolicyRic = (String) jsonObject.get("ric"); - String getPolicyService = (String) jsonObject.get("service"); - return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService); - default: - break; + } + + private Mono sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, + HttpStatus status) { + return createDmaapResponseMessage(dmaapRequestMessage, response, status) // + .flatMap(this::sendToDmaap) // + .onErrorResume(this::handleResponseCallError); + } + + private Mono sendToDmaap(String body) { + try { + logger.debug("sendToDmaap: {} ", body); + dmaapClient.send(body); + dmaapClient.sendBatchWithResponse(); + return Mono.just("OK"); + } catch (IOException e) { + return Mono.error(e); } - return null; } - private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status, - String message) { - System.out.println("buildResponse "); - return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "") - .put("originatorId", originatorId).put("requestId", requestId).put("status", status) - .put("message", message).toString(); + private Mono handleResponseCallError(Throwable t) { + logger.debug("Failed to send response to DMaaP: {}", t.getMessage()); + return Mono.empty(); } - // @PostConstruct - // The application properties value is always NULL for the first time - // Need to fix this - public void init() { - logger.debug("Reading DMAAP Publisher bus details from Application Config"); - Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); - String host = (String) dmaapPublisherConfig.get("ServiceName"); - topic = dmaapPublisherConfig.getProperty("topic"); - logger.debug("Read the topic & Service Name - {} , {}", host, topic); - this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config + private Mono createDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response, + HttpStatus status) { + DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() // + .status(status.toString()) // + .message(response == null ? "" : response) // + .type("response") // + .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) // + .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) // + .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) // + .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) // + .build(); + String str = gson.toJson(dmaapResponseMessage); + return Mono.just(str); } }