Merge "Add Policy agent blueprint"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageHandler.java
index bf9f06c..172fe98 100644 (file)
 
 package org.oransc.policyagent.dmaap;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.JSONTokener;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapMessage;
+import org.oransc.policyagent.model.DmaapRequestMessage;
+import org.oransc.policyagent.model.DmaapResponseMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -48,121 +50,115 @@ public class DmaapMessageHandler {
     @Autowired
     private PolicyController policyController;
     private AsyncRestClient restClient;
+    @Autowired
     private ApplicationConfig applicationConfig;
     private String topic = "";
 
-    @Autowired
-    public DmaapMessageHandler(ApplicationConfig applicationConfig) {
-        this.applicationConfig = applicationConfig;
-    }
-
     // The publish properties is corrupted. It contains the subscribe property values.
     @Async("threadPoolTaskExecutor")
     public void handleDmaapMsg(String msg) {
+        logger.debug("Message  ---------->{}", msg);
         init();
-        DmaapMessage dmaapMessage = null;
-        ResponseEntity<String> response = null;
+        DmaapRequestMessage dmaapRequestMessage = null;
+        Optional<String> dmaapResponse = null;
         // Process the message
         /**
          * Sample Request Message from DMAAP { "type": "request", "correlationId":
          * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
          * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
-         * "payload": "{\"ric\":\"ric1\"}" }
-         * --------------------------------------------------------------------------------------------------------------
-         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
-         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
-         * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+         * "payload": "{\"ricName\":\"ric1\"}" }
+         *
+         * -------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
+         * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
          * -------------------------------------------------------------------------------------------------------------
-         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
-         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
-         * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
          */
         try {
-            dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
-            // Post the accepted message to the DMAAP bus
-            logger.debug("DMAAP Message- {}", dmaapMessage);
-            logger.debug("Post Accepted Message to Client");
-            restClient
-                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
-                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
-                    .block(); //
+            dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
             // Call the Controller
             logger.debug("Invoke the Policy Agent Controller");
-            response = invokeController(dmaapMessage);
+            dmaapResponse = invokeController(dmaapRequestMessage);
             // Post the Response message to the DMAAP bus
-            logger.debug("DMAAP Response Message to Client- {}", response);
-            restClient
-                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
-                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
-                    .block(); //
+            logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
+            if (dmaapResponse.isPresent()) {
+                restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
+            }
         } catch (IOException e) {
             logger.error("Exception occured during message processing", e);
         }
     }
 
-    private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+    private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
         String formattedString = "";
         String ricName;
         String instance;
-        logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+        String jsonBody;
+        logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
         try {
-            formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+            formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
             logger.debug("Removed the Escape charater in payload- {}", formattedString);
         } catch (JSONException e) {
-            logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+            logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
         }
         JSONObject jsonObject = new JSONObject(formattedString);
-        switch (dmaapMessage.getOperation()) {
+        switch (dmaapRequestMessage.getOperation()) {
             case "getPolicySchemas":
                 ricName = (String) jsonObject.get("ricName");
                 logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
-                return policyController.getPolicySchemas(ricName);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
             case "getPolicySchema":
                 String policyTypeId = (String) jsonObject.get("id");
                 logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
-                System.out.println("policyTypeId" + policyTypeId);
-                return policyController.getPolicySchema(policyTypeId);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
             case "getPolicyTypes":
                 ricName = (String) jsonObject.get("ricName");
                 logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
-                return policyController.getPolicyTypes(ricName);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
             case "getPolicy":
                 instance = (String) jsonObject.get("instance");
                 logger.debug("Received the request for getPolicy with Instance- {}", instance);
-                return policyController.getPolicy(instance);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
             case "deletePolicy":
                 instance = (String) jsonObject.get("instance");
                 logger.debug("Received the request for deletePolicy with Instance- {}", instance);
-                return null;// policyController.deletePolicy(deleteInstance);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
             case "putPolicy":
                 String type = (String) jsonObject.get("type");
                 String putPolicyInstance = (String) jsonObject.get("instance");
                 String putPolicyRic = (String) jsonObject.get("ric");
                 String service = (String) jsonObject.get("service");
-                String jsonBody = (String) jsonObject.get("jsonBody");
-                return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+                jsonBody = (String) jsonObject.get("jsonBody");
+                return getDmaapResponseMessage(dmaapRequestMessage,
+                        policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
             case "getPolicies":
                 String getPolicyType = (String) jsonObject.get("type");
+                instance = (String) jsonObject.get("instance");
                 String getPolicyRic = (String) jsonObject.get("ric");
                 String getPolicyService = (String) jsonObject.get("service");
-                return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+                jsonBody = (String) jsonObject.get("jsonBody");
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController
+                        .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
             default:
                 break;
         }
-        return null;
+        return Optional.empty();
     }
 
-    private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
-            String message) {
-        System.out.println("buildResponse ");
-        return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
-                .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
-                .put("message", message).toString();
+    private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
+            ResponseEntity<?> policySchemas) {
+        DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
+                .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
+                .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
+                .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
+                .build();
+        try {
+            return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
+        } catch (JsonProcessingException e) {
+            logger.error("Exception occured during getDmaapResponseMessage", e);
+        }
+        return Optional.empty();
     }
 
-    // @PostConstruct
-    // The application properties value is always NULL for the first time
-    // Need to fix this
     public void init() {
         logger.debug("Reading DMAAP Publisher bus details from Application Config");
         Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();