Removed the DMAAP Accepted/Rejected Call 56/2356/5
authorLathish <lathishbabu.ganesan@est.tech>
Tue, 28 Jan 2020 14:01:19 +0000 (14:01 +0000)
committerLathish <lathishbabu.ganesan@est.tech>
Tue, 28 Jan 2020 15:39:45 +0000 (15:39 +0000)
Issue-ID: NONRTRIC-107
Change-Id: If5839d5b3c8a874298b7c46d118fdc17b76bc097
Signed-off-by: Lathish <lathishbabu.ganesan@est.tech>
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java [moved from policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java with 96% similarity]
policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java [new file with mode: 0644]

index 2ae5e5e..503ddab 100644 (file)
@@ -22,7 +22,6 @@ package org.oransc.policyagent.dmaap;
 
 import java.io.IOException;
 import java.util.Properties;
-import javax.annotation.PostConstruct;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -55,9 +54,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
     @Override
     public void run() {
-        /*
-         * if (!alive) { init(); }
-         */
+        if (!alive) {
+            init();
+        }
         if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
@@ -85,7 +84,8 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         return response.getActualMessages();
     }
 
-    @PostConstruct
+    // Properties are not loaded in first atempt. Need to fix this and then uncomment the post construct annotation
+    // @PostConstruct
     @Override
     public void init() {
         Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
@@ -98,6 +98,8 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         }
         try {
             logger.debug("Creating DMAAP Client");
+            System.out.println("dmaapConsumerProperties--->"+dmaapConsumerProperties.getProperty("topic"));
+            System.out.println("dmaapPublisherProperties--->"+dmaapPublisherProperties.getProperty("topic"));
             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
index bf9f06c..713d483 100644 (file)
 
 package org.oransc.policyagent.dmaap;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.JSONTokener;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapMessage;
+import org.oransc.policyagent.model.DmaapRequestMessage;
+import org.oransc.policyagent.model.DmaapResponseMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -60,104 +62,104 @@ public class DmaapMessageHandler {
     @Async("threadPoolTaskExecutor")
     public void handleDmaapMsg(String msg) {
         init();
-        DmaapMessage dmaapMessage = null;
-        ResponseEntity<String> response = null;
+        DmaapRequestMessage dmaapRequestMessage = null;
+        Optional<String> dmaapResponse = null;
         // Process the message
         /**
          * Sample Request Message from DMAAP { "type": "request", "correlationId":
          * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
          * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
-         * "payload": "{\"ric\":\"ric1\"}" }
+         * "payload": "{\"ricName\":\"ric1\"}" }
+         *
          * --------------------------------------------------------------------------------------------------------------
-         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
-         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
-         * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+         * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
+         * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
          * -------------------------------------------------------------------------------------------------------------
-         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
-         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
-         * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
          */
         try {
-            dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
-            // Post the accepted message to the DMAAP bus
-            logger.debug("DMAAP Message- {}", dmaapMessage);
-            logger.debug("Post Accepted Message to Client");
-            restClient
-                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
-                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
-                    .block(); //
+            dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
             // Call the Controller
             logger.debug("Invoke the Policy Agent Controller");
-            response = invokeController(dmaapMessage);
+            dmaapResponse = invokeController(dmaapRequestMessage);
             // Post the Response message to the DMAAP bus
-            logger.debug("DMAAP Response Message to Client- {}", response);
-            restClient
-                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
-                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
-                    .block(); //
+            logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
+            if (dmaapResponse.isPresent()) {
+                restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
+            }
         } catch (IOException e) {
             logger.error("Exception occured during message processing", e);
         }
     }
 
-    private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+    private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
         String formattedString = "";
         String ricName;
         String instance;
-        logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+        String jsonBody;
+        logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
         try {
-            formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+            formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
             logger.debug("Removed the Escape charater in payload- {}", formattedString);
         } catch (JSONException e) {
-            logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+            logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
         }
         JSONObject jsonObject = new JSONObject(formattedString);
-        switch (dmaapMessage.getOperation()) {
+        switch (dmaapRequestMessage.getOperation()) {
             case "getPolicySchemas":
                 ricName = (String) jsonObject.get("ricName");
                 logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
-                return policyController.getPolicySchemas(ricName);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
             case "getPolicySchema":
                 String policyTypeId = (String) jsonObject.get("id");
                 logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
-                System.out.println("policyTypeId" + policyTypeId);
-                return policyController.getPolicySchema(policyTypeId);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
             case "getPolicyTypes":
                 ricName = (String) jsonObject.get("ricName");
                 logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
-                return policyController.getPolicyTypes(ricName);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
             case "getPolicy":
                 instance = (String) jsonObject.get("instance");
                 logger.debug("Received the request for getPolicy with Instance- {}", instance);
-                return policyController.getPolicy(instance);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
             case "deletePolicy":
                 instance = (String) jsonObject.get("instance");
                 logger.debug("Received the request for deletePolicy with Instance- {}", instance);
-                return null;// policyController.deletePolicy(deleteInstance);
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
             case "putPolicy":
                 String type = (String) jsonObject.get("type");
                 String putPolicyInstance = (String) jsonObject.get("instance");
                 String putPolicyRic = (String) jsonObject.get("ric");
                 String service = (String) jsonObject.get("service");
-                String jsonBody = (String) jsonObject.get("jsonBody");
-                return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+                jsonBody = (String) jsonObject.get("jsonBody");
+                return getDmaapResponseMessage(dmaapRequestMessage,
+                        policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
             case "getPolicies":
                 String getPolicyType = (String) jsonObject.get("type");
+                instance = (String) jsonObject.get("instance");
                 String getPolicyRic = (String) jsonObject.get("ric");
                 String getPolicyService = (String) jsonObject.get("service");
-                return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+                jsonBody = (String) jsonObject.get("jsonBody");
+                return getDmaapResponseMessage(dmaapRequestMessage, policyController
+                        .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
             default:
                 break;
         }
-        return null;
+        return Optional.empty();
     }
 
-    private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
-            String message) {
-        System.out.println("buildResponse ");
-        return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
-                .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
-                .put("message", message).toString();
+    private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
+            ResponseEntity<?> policySchemas) {
+        DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
+                .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
+                .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
+                .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
+                .build();
+        try {
+            return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
+        } catch (JsonProcessingException e) {
+            logger.error("Exception occured during getDmaapResponseMessage", e);
+        }
+        return Optional.empty();
     }
 
     // @PostConstruct
@@ -168,6 +170,7 @@ public class DmaapMessageHandler {
         Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
         String host = (String) dmaapPublisherConfig.get("ServiceName");
         topic = dmaapPublisherConfig.getProperty("topic");
+        System.out.println("\"Read the topic ---------->" + topic);
         logger.debug("Read the topic & Service Name - {} , {}", host, topic);
         this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
 
@@ -27,7 +27,7 @@ import lombok.Setter;
 
 @Getter
 @Setter
-public class DmaapMessage {
+public class DmaapRequestMessage {
 
     @NotNull
     private String type;
@@ -35,7 +35,6 @@ public class DmaapMessage {
     private String correlationId;
     @NotNull
     private String target;
-    @NotNull
     private Timestamp timestamp;
     private String apiVersion;
     @NotNull
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java
new file mode 100644 (file)
index 0000000..ebf5e8c
--- /dev/null
@@ -0,0 +1,46 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.model;
+
+import java.sql.Timestamp;
+import javax.validation.constraints.NotNull;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@Builder
+public class DmaapResponseMessage {
+
+    @NotNull
+    private String type;
+    @NotNull
+    private String correlationId;
+    private Timestamp timestamp;
+    @NotNull
+    private String originatorId;
+    private String requestId;
+    @NotNull
+    private String status;
+    @NotNull
+    private String message;
+}