Process message with Producer logic 48/2348/3
authorLathish <lathishbabu.ganesan@est.tech>
Mon, 27 Jan 2020 14:57:22 +0000 (14:57 +0000)
committerLathish <lathishbabu.ganesan@est.tech>
Mon, 27 Jan 2020 20:35:49 +0000 (20:35 +0000)
Issue-ID: NONRTRIC-107
Change-Id: Iba71ee37ad2e19742afb303d19632126b6067e35
Signed-off-by: Lathish <lathishbabu.ganesan@est.tech>
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java [new file with mode: 0644]
policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json

index 43d3aef..f2a9411 100644 (file)
             <artifactId>dmaap-client</artifactId>
                        <version>${sdk.version}</version>
                </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+         <scope>provided</scope>
+        </dependency>
                <!--REQUIRED TO GENERATE DOCUMENTATION -->
                <dependency>
                        <groupId>io.springfox</groupId>
index f0826e9..e05eb95 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.Policies;
@@ -61,4 +62,9 @@ class BeanFactory {
         return new A1ClientFactory();
     }
 
+    @Bean
+    public ObjectMapper mapper() {
+      return new ObjectMapper();
+    }
+
 }
index d32db7a..34d1d97 100644 (file)
@@ -25,16 +25,13 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
-
 import javax.validation.constraints.NotNull;
-
 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.springframework.http.MediaType;
@@ -125,9 +122,9 @@ public class ApplicationConfigParser {
             String urlPath = url.getPath();
             DmaapUrlPath path = parseDmaapUrlPath(urlPath);
 
-            dmaapProps.put("ServiceName", url.getHost());
+            dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events");
             dmaapProps.put("topic", path.dmaapTopicName);
-            dmaapProps.put("host", url.getHost());
+            dmaapProps.put("host", url.getHost()+":"+url.getPort());
             dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
             dmaapProps.put("userName", userName);
             dmaapProps.put("password", passwd);
@@ -136,7 +133,6 @@ public class ApplicationConfigParser {
             dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
             dmaapProps.put("timeout", 15000);
             dmaapProps.put("limit", 1000);
-            dmaapProps.put("port", url.getPort());
         } catch (MalformedURLException e) {
             throw new ServiceException("Could not parse the URL", e);
         }
@@ -166,7 +162,7 @@ public class ApplicationConfigParser {
             throw new ServiceException("The path has incorrect syntax: " + urlPath);
         }
 
-        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
+        final String dmaapTopicName =  tokens[2]; // /events/A1-P
         String consumerGroup = ""; // users
         String consumerId = ""; // sdnc1
         if (tokens.length == 5) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java
new file mode 100644 (file)
index 0000000..085320c
--- /dev/null
@@ -0,0 +1,44 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.configuration;
+
+import java.util.concurrent.Executor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+@EnableAsync
+public class AsyncConfiguration implements AsyncConfigurer {
+
+    @Override
+    @Bean(name = "threadPoolTaskExecutor")
+    public Executor getAsyncExecutor() {
+        //Set this configuration value from common properties file
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(5);
+        executor.setMaxPoolSize(10);
+        executor.setQueueCapacity(25);
+        return executor;
+    }
+}
index 191a13b..2ae5e5e 100644 (file)
@@ -22,7 +22,7 @@ package org.oransc.policyagent.dmaap;
 
 import java.io.IOException;
 import java.util.Properties;
-
+import javax.annotation.PostConstruct;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -44,21 +44,24 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRConsumerResponse response = null;
+    @Autowired
+    private DmaapMessageHandler dmaapMessageHandler;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
         this.applicationConfig = applicationConfig;
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
+    @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
     @Override
     public void run() {
-        if (!alive) {
-            init();
-        }
+        /*
+         * if (!alive) { init(); }
+         */
         if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
+                logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
                 for (String msg : dmaapMsgs) {
                     processMsg(msg);
                 }
@@ -76,23 +79,25 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
             if (!"200".equals(response.getResponseCode())) {
                 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
-                    response.getResponseMessage());
+                        response.getResponseMessage());
             }
         }
         return response.getActualMessages();
     }
 
+    @PostConstruct
     @Override
     public void init() {
         Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
         // No need to start if there is no configuration.
         if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
-            || dmaapPublisherProperties.size() == 0) {
+                || dmaapPublisherProperties.size() == 0) {
+            logger.error("DMaaP properties Failed to Load");
             return;
         }
-        // Do we need to do any validation of properties before calling the factory?
         try {
+            logger.debug("Creating DMAAP Client");
             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
@@ -102,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
     @Override
     public void processMsg(String msg) throws Exception {
-        System.out.println("sysout" + msg);
+        logger.debug("Message Reveived from DMAAP : {}", msg);
         // Call the concurrent Task executor to handle the incoming request
-        // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
-        // through REST CLIENT
-        // Call the Controller with the extracted payload
+        dmaapMessageHandler.handleDmaapMsg(msg);
     }
 
     @Override
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
new file mode 100644 (file)
index 0000000..bf9f06c
--- /dev/null
@@ -0,0 +1,175 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.dmaap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.controllers.PolicyController;
+import org.oransc.policyagent.model.DmaapMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+
+    @Autowired
+    private ObjectMapper mapper;
+    @Autowired
+    private PolicyController policyController;
+    private AsyncRestClient restClient;
+    private ApplicationConfig applicationConfig;
+    private String topic = "";
+
+    @Autowired
+    public DmaapMessageHandler(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
+
+    // The publish properties is corrupted. It contains the subscribe property values.
+    @Async("threadPoolTaskExecutor")
+    public void handleDmaapMsg(String msg) {
+        init();
+        DmaapMessage dmaapMessage = null;
+        ResponseEntity<String> response = null;
+        // Process the message
+        /**
+         * Sample Request Message from DMAAP { "type": "request", "correlationId":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
+         * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
+         * "payload": "{\"ric\":\"ric1\"}" }
+         * --------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+         * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+         * -------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+         * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
+         */
+        try {
+            dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
+            // Post the accepted message to the DMAAP bus
+            logger.debug("DMAAP Message- {}", dmaapMessage);
+            logger.debug("Post Accepted Message to Client");
+            restClient
+                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
+                    .block(); //
+            // Call the Controller
+            logger.debug("Invoke the Policy Agent Controller");
+            response = invokeController(dmaapMessage);
+            // Post the Response message to the DMAAP bus
+            logger.debug("DMAAP Response Message to Client- {}", response);
+            restClient
+                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
+                    .block(); //
+        } catch (IOException e) {
+            logger.error("Exception occured during message processing", e);
+        }
+    }
+
+    private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+        String formattedString = "";
+        String ricName;
+        String instance;
+        logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+        try {
+            formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+            logger.debug("Removed the Escape charater in payload- {}", formattedString);
+        } catch (JSONException e) {
+            logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+        }
+        JSONObject jsonObject = new JSONObject(formattedString);
+        switch (dmaapMessage.getOperation()) {
+            case "getPolicySchemas":
+                ricName = (String) jsonObject.get("ricName");
+                logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
+                return policyController.getPolicySchemas(ricName);
+            case "getPolicySchema":
+                String policyTypeId = (String) jsonObject.get("id");
+                logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
+                System.out.println("policyTypeId" + policyTypeId);
+                return policyController.getPolicySchema(policyTypeId);
+            case "getPolicyTypes":
+                ricName = (String) jsonObject.get("ricName");
+                logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
+                return policyController.getPolicyTypes(ricName);
+            case "getPolicy":
+                instance = (String) jsonObject.get("instance");
+                logger.debug("Received the request for getPolicy with Instance- {}", instance);
+                return policyController.getPolicy(instance);
+            case "deletePolicy":
+                instance = (String) jsonObject.get("instance");
+                logger.debug("Received the request for deletePolicy with Instance- {}", instance);
+                return null;// policyController.deletePolicy(deleteInstance);
+            case "putPolicy":
+                String type = (String) jsonObject.get("type");
+                String putPolicyInstance = (String) jsonObject.get("instance");
+                String putPolicyRic = (String) jsonObject.get("ric");
+                String service = (String) jsonObject.get("service");
+                String jsonBody = (String) jsonObject.get("jsonBody");
+                return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+            case "getPolicies":
+                String getPolicyType = (String) jsonObject.get("type");
+                String getPolicyRic = (String) jsonObject.get("ric");
+                String getPolicyService = (String) jsonObject.get("service");
+                return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
+            String message) {
+        System.out.println("buildResponse ");
+        return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
+                .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
+                .put("message", message).toString();
+    }
+
+    // @PostConstruct
+    // The application properties value is always NULL for the first time
+    // Need to fix this
+    public void init() {
+        logger.debug("Reading DMAAP Publisher bus details from Application Config");
+        Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
+        String host = (String) dmaapPublisherConfig.get("ServiceName");
+        topic = dmaapPublisherConfig.getProperty("topic");
+        logger.debug("Read the topic & Service Name - {} , {}", host, topic);
+        this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+
+    }
+}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
new file mode 100644 (file)
index 0000000..e56f4b4
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.model;
+
+import java.sql.Timestamp;
+import javax.validation.constraints.NotNull;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class DmaapMessage {
+
+    @NotNull
+    private String type;
+    @NotNull
+    private String correlationId;
+    @NotNull
+    private String target;
+    @NotNull
+    private Timestamp timestamp;
+    private String apiVersion;
+    @NotNull
+    private String originatorId;
+    private String requestId;
+    @NotNull
+    private String operation;
+    private String payload;
+}
index d4a3972..c945360 100644 (file)
       "dmaap_publisher": {
          "type": "message_router",
          "dmaap_info": {
-            "topic_url": "https://dradmin:dradmin@localhost:2222/events/A1-P-RESULT"
+            "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
          }
       }
    },
    "streams_subscribes": {
       "dmaap_subscriber": {
          "dmaap_info": {
-            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
+            "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
          },
          "type": "message_router"
       }