Merge "Adapt A1 controller to latest A1 spec"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Tue, 28 Jan 2020 08:46:50 +0000 (08:46 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Tue, 28 Jan 2020 08:46:50 +0000 (08:46 +0000)
15 files changed:
policy-agent/README.md
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java
policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java
policy-agent/src/test/resources/test_application_configuration.json
policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json [new file with mode: 0644]

index 3ddbbd5..6077a4b 100644 (file)
@@ -1,21 +1,25 @@
 # O-RAN-SC NonRT RIC Dashboard Web Application
 
-The O-RAN NonRT RIC PolicyAgent provides a REST API for management of 
-policices. It provides support for 
--Policy configuration. This includes
- -One REST API towards all RICs in the network
- -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
- -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC 
--Supervision of clients (R-APPs) to eliminate stray policies in case of failure
--Consistency monitoring of the SMO view of policies and the actual situation in the RICs
--Consistency monitoring of RIC capabilities (policy types)
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices.
+It provides support for:
+ -Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+ -Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+ -Consistency monitoring of RIC capabilities (policy types)
+ -Policy configuration. This includes:
+  -One REST API towards all RICs in the network
+  -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP),
+   all policies of a type etc.
+  -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
 
 To Run Policy Agent in Local:
-Create a symbolic link with below command,
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
 ln -s <path to test_application_configuration.json> application_configuration.json
 
-The agent can be run stand alone in a simulated test mode. Then it 
-simulates RICs. 
+To Run Policy Agent in Local with the DMaaP polling turned on:
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
+ln -s <path to test_application_configuration_with_dmaap_config.json> application_configuration.json
+
+The agent can be run stand alone in a simulated test mode. Then it simulates RICs.
 The REST API is published on port 8081 and it is started by command:
 mvn -Dtest=MockPolicyAgent test
 
index 43d3aef..f2a9411 100644 (file)
             <artifactId>dmaap-client</artifactId>
                        <version>${sdk.version}</version>
                </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+         <scope>provided</scope>
+        </dependency>
                <!--REQUIRED TO GENERATE DOCUMENTATION -->
                <dependency>
                        <groupId>io.springfox</groupId>
index f0826e9..e05eb95 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.Policies;
@@ -61,4 +62,9 @@ class BeanFactory {
         return new A1ClientFactory();
     }
 
+    @Bean
+    public ObjectMapper mapper() {
+      return new ObjectMapper();
+    }
+
 }
index 4c1d140..1c40308 100644 (file)
@@ -50,31 +50,28 @@ public class StdA1Client implements A1Client {
     }
 
     @Override
-    public Mono<List<String>> getPolicyTypeIdentities() {
-        return restClient.get("/policytypes/identities") //
+    public Mono<List<String>> getPolicyIdentities() {
+        return restClient.get("/policies") //
             .flatMap(this::parseJsonArrayOfString);
     }
 
     @Override
-    public Mono<List<String>> getPolicyIdentities() {
-        return restClient.get("/policies/identities") //
-            .flatMap(this::parseJsonArrayOfString);
+    public Mono<String> putPolicy(Policy policy) {
+        String url = "/policies/" + policy.id() + "?policyTypeId=" + policy.type().name();
+        return restClient.put(url, policy.json()) //
+            .flatMap(this::validateJson);
     }
 
     @Override
-    public Mono<String> getPolicyTypeSchema(String policyTypeId) {
-        Mono<String> response = restClient.get("/policytypes/" + policyTypeId);
-        return response.flatMap(this::createMono);
+    public Mono<List<String>> getPolicyTypeIdentities() {
+        return restClient.get("/policytypes") //
+            .flatMap(this::parseJsonArrayOfString);
     }
 
     @Override
-    public Mono<String> putPolicy(Policy policy) {
-        // TODO update when simulator is updated to include policy type
-        // Mono<String> response = client.put("/policies/" + policy.id() + "?policyTypeId=" + policy.type().name(),
-        // policy.json());
-        Mono<String> response = restClient.put("/policies/" + policy.id(), policy.json());
-
-        return response.flatMap(this::createMono);
+    public Mono<String> getPolicyTypeSchema(String policyTypeId) {
+        return restClient.get("/policytypes/" + policyTypeId) //
+            .flatMap(this::extractPolicySchema);
     }
 
     @Override
@@ -113,12 +110,21 @@ public class StdA1Client implements A1Client {
         }
     }
 
-    private Mono<String> createMono(String inputString) {
+    private Mono<String> extractPolicySchema(String inputString) {
         try {
             JSONObject jsonObject = new JSONObject(inputString);
-            String jsonString = jsonObject.toString();
-            logger.debug("A1 client: received string = {}", jsonString);
-            return Mono.just(jsonString);
+            JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
+            String schemaString = schemaObject.toString();
+            return Mono.just(schemaString);
+        } catch (JSONException ex) { // invalid json
+            return Mono.error(ex);
+        }
+    }
+
+    private Mono<String> validateJson(String inputString) {
+        try {
+            new JSONObject(inputString);
+            return Mono.just(inputString);
         } catch (JSONException ex) { // invalid json
             return Mono.error(ex);
         }
index 673fe1d..1ed3fdb 100644 (file)
@@ -42,6 +42,7 @@ public class ApplicationConfig {
 
     private Collection<Observer> observers = new Vector<>();
     private Map<String, RicConfig> ricConfigs = new HashMap<>();
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     @Autowired
@@ -72,6 +73,10 @@ public class ApplicationConfig {
         throw new ServiceException("Could not find ric: " + ricName);
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapConsumerConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -98,7 +103,8 @@ public class ApplicationConfig {
         }
     }
 
-    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapConsumerConfig) {
+    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapPublisherConfig,
+        Properties dmaapConsumerConfig) {
         Collection<Notification> notifications = new Vector<>();
         synchronized (this) {
             Map<String, RicConfig> newRicConfigs = new HashMap<>();
@@ -123,6 +129,7 @@ public class ApplicationConfig {
         }
         notifyObservers(notifications);
 
+        this.dmaapPublisherConfig = dmaapPublisherConfig;
         this.dmaapConsumerConfig = dmaapConsumerConfig;
     }
 
index 530ac98..34d1d97 100644 (file)
@@ -25,17 +25,16 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
-
 import javax.validation.constraints.NotNull;
-
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.springframework.http.MediaType;
 
 public class ApplicationConfigParser {
 
@@ -46,6 +45,7 @@ public class ApplicationConfigParser {
         .create(); //
 
     private Vector<RicConfig> ricConfig;
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     public ApplicationConfigParser() {
@@ -54,14 +54,28 @@ public class ApplicationConfigParser {
     public void parse(JsonObject root) throws ServiceException {
         JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
         ricConfig = parseRics(ricConfigJson);
-        JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
-        dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
+        JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes");
+        if (dmaapPublisherConfigJson == null) {
+            dmaapPublisherConfig = new Properties();
+        } else {
+            dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson);
+        }
+        JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes");
+        if (dmaapConsumerConfigJson == null) {
+            dmaapConsumerConfig = new Properties();
+        } else {
+            dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson);
+        }
     }
 
     public Vector<RicConfig> getRicConfigs() {
         return this.ricConfig;
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapPublisherConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -86,7 +100,7 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsJsonArray();
     }
 
-    private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+    private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException {
         Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
         if (topics.size() != 1) {
             throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
@@ -106,17 +120,19 @@ public class ApplicationConfigParser {
                 passwd = userInfo[1];
             }
             String urlPath = url.getPath();
-            DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+            DmaapUrlPath path = parseDmaapUrlPath(urlPath);
 
-            dmaapProps.put("port", url.getPort());
-            dmaapProps.put("server", url.getHost());
+            dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events");
             dmaapProps.put("topic", path.dmaapTopicName);
-            dmaapProps.put("consumerGroup", path.consumerGroup);
-            dmaapProps.put("consumerInstance", path.consumerId);
-            dmaapProps.put("fetchTimeout", 15000);
-            dmaapProps.put("fetchLimit", 1000);
+            dmaapProps.put("host", url.getHost()+":"+url.getPort());
+            dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
             dmaapProps.put("userName", userName);
             dmaapProps.put("password", passwd);
+            dmaapProps.put("group", path.consumerGroup);
+            dmaapProps.put("id", path.consumerId);
+            dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
+            dmaapProps.put("timeout", 15000);
+            dmaapProps.put("limit", 1000);
         } catch (MalformedURLException e) {
             throw new ServiceException("Could not parse the URL", e);
         }
@@ -128,27 +144,31 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsString();
     }
 
-    private class DmaapConsumerUrlPath {
+    private class DmaapUrlPath {
         final String dmaapTopicName;
         final String consumerGroup;
         final String consumerId;
 
-        DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+        DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
             this.dmaapTopicName = dmaapTopicName;
             this.consumerGroup = consumerGroup;
             this.consumerId = consumerId;
         }
     }
 
-    private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+    private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
         String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
-        if (tokens.length != 5) {
+        if (!(tokens.length == 3 ^ tokens.length == 5)) {
             throw new ServiceException("The path has incorrect syntax: " + urlPath);
         }
 
-        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
-        final String consumerGroup = tokens[3]; // users
-        final String consumerId = tokens[4]; // sdnc1
-        return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+        final String dmaapTopicName =  tokens[2]; // /events/A1-P
+        String consumerGroup = ""; // users
+        String consumerId = ""; // sdnc1
+        if (tokens.length == 5) {
+            consumerGroup = tokens[3];
+            consumerId = tokens[4];
+        }
+        return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
     }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java
new file mode 100644 (file)
index 0000000..085320c
--- /dev/null
@@ -0,0 +1,44 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.configuration;
+
+import java.util.concurrent.Executor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+@EnableAsync
+public class AsyncConfiguration implements AsyncConfigurer {
+
+    @Override
+    @Bean(name = "threadPoolTaskExecutor")
+    public Executor getAsyncExecutor() {
+        //Set this configuration value from common properties file
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(5);
+        executor.setMaxPoolSize(10);
+        executor.setQueueCapacity(25);
+        return executor;
+    }
+}
index fd42104..889dfaf 100644 (file)
@@ -21,8 +21,6 @@
 
 package org.oransc.policyagent.dmaap;
 
-import java.util.Properties;
-
 /**
  * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
  *
@@ -34,7 +32,7 @@ public interface DmaapMessageConsumer {
      *
      * @param properties
      */
-    public void init(Properties properties);
+    public void init();
 
     /**
      * This method process the message and call the respective Controller
index 74cfe0d..2ae5e5e 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap;
 
 import java.io.IOException;
 import java.util.Properties;
+import javax.annotation.PostConstruct;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -40,21 +41,27 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
 
     private boolean alive = false;
+    private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRConsumerResponse response = null;
+    @Autowired
+    private DmaapMessageHandler dmaapMessageHandler;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
-        init(dmaapConsumerConfig);
+        this.applicationConfig = applicationConfig;
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
+    @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
     @Override
     public void run() {
-        while (this.alive) {
+        /*
+         * if (!alive) { init(); }
+         */
+        if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
+                logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
                 for (String msg : dmaapMsgs) {
                     processMsg(msg);
                 }
@@ -78,24 +85,20 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         return response.getActualMessages();
     }
 
+    @PostConstruct
     @Override
-    public void init(Properties properties) {
-        // Initialize the DMAAP with the properties
-        // Do we need to do any validation of properties before calling the factory?
-        Properties prop = new Properties();
-        prop.setProperty("ServiceName", "localhost:6845/events");
-        prop.setProperty("topic", "A1-P");
-        prop.setProperty("host", "localhost:6845");
-        prop.setProperty("contenttype", "application/json");
-        prop.setProperty("username", "admin");
-        prop.setProperty("password", "admin");
-        prop.setProperty("group", "users");
-        prop.setProperty("id", "policy-agent");
-        prop.setProperty("TransportType", "HTTPNOAUTH");
-        prop.setProperty("timeout", "15000");
-        prop.setProperty("limit", "1000");
+    public void init() {
+        Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+        // No need to start if there is no configuration.
+        if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+                || dmaapPublisherProperties.size() == 0) {
+            logger.error("DMaaP properties Failed to Load");
+            return;
+        }
         try {
-            consumer = MRClientFactory.createConsumer(prop);
+            logger.debug("Creating DMAAP Client");
+            consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
             logger.error("Exception occurred while creating Dmaap Consumer", e);
@@ -104,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
     @Override
     public void processMsg(String msg) throws Exception {
-        System.out.println("sysout" + msg);
+        logger.debug("Message Reveived from DMAAP : {}", msg);
         // Call the concurrent Task executor to handle the incoming request
-        // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
-        // through REST CLIENT
-        // Call the Controller with the extracted payload
+        dmaapMessageHandler.handleDmaapMsg(msg);
     }
 
     @Override
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
new file mode 100644 (file)
index 0000000..bf9f06c
--- /dev/null
@@ -0,0 +1,175 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.dmaap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.controllers.PolicyController;
+import org.oransc.policyagent.model.DmaapMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+
+    @Autowired
+    private ObjectMapper mapper;
+    @Autowired
+    private PolicyController policyController;
+    private AsyncRestClient restClient;
+    private ApplicationConfig applicationConfig;
+    private String topic = "";
+
+    @Autowired
+    public DmaapMessageHandler(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
+
+    // The publish properties is corrupted. It contains the subscribe property values.
+    @Async("threadPoolTaskExecutor")
+    public void handleDmaapMsg(String msg) {
+        init();
+        DmaapMessage dmaapMessage = null;
+        ResponseEntity<String> response = null;
+        // Process the message
+        /**
+         * Sample Request Message from DMAAP { "type": "request", "correlationId":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
+         * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
+         * "payload": "{\"ric\":\"ric1\"}" }
+         * --------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+         * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+         * -------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+         * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
+         */
+        try {
+            dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
+            // Post the accepted message to the DMAAP bus
+            logger.debug("DMAAP Message- {}", dmaapMessage);
+            logger.debug("Post Accepted Message to Client");
+            restClient
+                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
+                    .block(); //
+            // Call the Controller
+            logger.debug("Invoke the Policy Agent Controller");
+            response = invokeController(dmaapMessage);
+            // Post the Response message to the DMAAP bus
+            logger.debug("DMAAP Response Message to Client- {}", response);
+            restClient
+                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
+                    .block(); //
+        } catch (IOException e) {
+            logger.error("Exception occured during message processing", e);
+        }
+    }
+
+    private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+        String formattedString = "";
+        String ricName;
+        String instance;
+        logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+        try {
+            formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+            logger.debug("Removed the Escape charater in payload- {}", formattedString);
+        } catch (JSONException e) {
+            logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+        }
+        JSONObject jsonObject = new JSONObject(formattedString);
+        switch (dmaapMessage.getOperation()) {
+            case "getPolicySchemas":
+                ricName = (String) jsonObject.get("ricName");
+                logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
+                return policyController.getPolicySchemas(ricName);
+            case "getPolicySchema":
+                String policyTypeId = (String) jsonObject.get("id");
+                logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
+                System.out.println("policyTypeId" + policyTypeId);
+                return policyController.getPolicySchema(policyTypeId);
+            case "getPolicyTypes":
+                ricName = (String) jsonObject.get("ricName");
+                logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
+                return policyController.getPolicyTypes(ricName);
+            case "getPolicy":
+                instance = (String) jsonObject.get("instance");
+                logger.debug("Received the request for getPolicy with Instance- {}", instance);
+                return policyController.getPolicy(instance);
+            case "deletePolicy":
+                instance = (String) jsonObject.get("instance");
+                logger.debug("Received the request for deletePolicy with Instance- {}", instance);
+                return null;// policyController.deletePolicy(deleteInstance);
+            case "putPolicy":
+                String type = (String) jsonObject.get("type");
+                String putPolicyInstance = (String) jsonObject.get("instance");
+                String putPolicyRic = (String) jsonObject.get("ric");
+                String service = (String) jsonObject.get("service");
+                String jsonBody = (String) jsonObject.get("jsonBody");
+                return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+            case "getPolicies":
+                String getPolicyType = (String) jsonObject.get("type");
+                String getPolicyRic = (String) jsonObject.get("ric");
+                String getPolicyService = (String) jsonObject.get("service");
+                return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
+            String message) {
+        System.out.println("buildResponse ");
+        return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
+                .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
+                .put("message", message).toString();
+    }
+
+    // @PostConstruct
+    // The application properties value is always NULL for the first time
+    // Need to fix this
+    public void init() {
+        logger.debug("Reading DMAAP Publisher bus details from Application Config");
+        Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
+        String host = (String) dmaapPublisherConfig.get("ServiceName");
+        topic = dmaapPublisherConfig.getProperty("topic");
+        logger.debug("Read the topic & Service Name - {} , {}", host, topic);
+        this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+
+    }
+}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
new file mode 100644 (file)
index 0000000..e56f4b4
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.model;
+
+import java.sql.Timestamp;
+import javax.validation.constraints.NotNull;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class DmaapMessage {
+
+    @NotNull
+    private String type;
+    @NotNull
+    private String correlationId;
+    @NotNull
+    private String target;
+    @NotNull
+    private Timestamp timestamp;
+    private String apiVersion;
+    @NotNull
+    private String originatorId;
+    private String requestId;
+    @NotNull
+    private String operation;
+    private String payload;
+}
index bc43eda..1ab5fc9 100644 (file)
@@ -125,7 +125,8 @@ public class RefreshConfigTask {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             parser.parse(jsonObject);
-            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig());
+            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
+                parser.getDmaapConsumerConfig());
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
         }
@@ -152,7 +153,8 @@ public class RefreshConfigTask {
             }
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig());
+            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
+                appParser.getDmaapConsumerConfig());
             logger.info("Local configuration file loaded: {}", filepath);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
             logger.trace("Local configuration file not loaded: {}", filepath, e);
index 1454cca..2c83e5a 100644 (file)
@@ -21,6 +21,7 @@
 package org.oransc.policyagent.clients;
 
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -51,8 +52,8 @@ import reactor.test.StepVerifier;
 @RunWith(MockitoJUnitRunner.class)
 public class StdA1ClientTest {
     private static final String RIC_URL = "RicUrl";
-    private static final String POLICYTYPES_IDENTITIES_URL = "/policytypes/identities";
-    private static final String POLICIES_IDENTITIES_URL = "/policies/identities";
+    private static final String POLICYTYPES_IDENTITIES_URL = "/policytypes";
+    private static final String POLICIES_IDENTITIES_URL = "/policies";
     private static final String POLICYTYPES_URL = "/policytypes/";
     private static final String POLICIES_URL = "/policies/";
 
@@ -98,8 +99,10 @@ public class StdA1ClientTest {
 
     @Test
     public void testGetValidPolicyType() {
-        when(asyncRestClientMock.get(POLICYTYPES_URL + POLICY_TYPE_1_NAME))
-            .thenReturn(Mono.just(POLICY_TYPE_SCHEMA_VALID));
+        Mono<?> policyTypeResp =
+            Mono.just("{\"policySchema\": " + POLICY_TYPE_SCHEMA_VALID + ", \"statusSchema\": {} }");
+
+        doReturn(policyTypeResp).when(asyncRestClientMock).get(POLICYTYPES_URL + POLICY_TYPE_1_NAME);
 
         Mono<String> policyTypeMono = a1Client.getPolicyTypeSchema(POLICY_TYPE_1_NAME);
         verify(asyncRestClientMock).get(POLICYTYPES_URL + POLICY_TYPE_1_NAME);
@@ -123,7 +126,7 @@ public class StdA1ClientTest {
 
         Mono<String> policyMono =
             a1Client.putPolicy(createPolicy(RIC_URL, POLICY_1_ID, POLICY_JSON_VALID, POLICY_TYPE));
-        verify(asyncRestClientMock).put(POLICIES_URL + POLICY_1_ID, POLICY_JSON_VALID);
+        verify(asyncRestClientMock).put(POLICIES_URL + POLICY_1_ID + "?policyTypeId=" + POLICY_TYPE, POLICY_JSON_VALID);
         StepVerifier.create(policyMono).expectNext(POLICY_JSON_VALID).expectComplete().verify();
     }
 
index c63b710..446c061 100644 (file)
             ]
          }
       ]
-   },
-   "streams_subscribes": {
-      "dmaap_subscriber": {
-         "dmaap_info": {
-            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
-         },
-         "type": "message_router"
-      }
    }
 }
\ No newline at end of file
diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
new file mode 100644 (file)
index 0000000..c945360
--- /dev/null
@@ -0,0 +1,39 @@
+{
+   "config": {
+      "//description": "Application configuration",
+      "ric": [
+         {
+            "name": "ric1",
+            "baseUrl": "http://localhost:8080/",
+            "managedElementIds": [
+               "kista_1",
+               "kista_2"
+            ]
+         },
+         {
+            "name": "ric2",
+            "baseUrl": "http://localhost:8081/",
+            "managedElementIds": [
+               "kista_3",
+               "kista_4"
+            ]
+         }
+      ]
+   },
+   "streams_publishes": {
+      "dmaap_publisher": {
+         "type": "message_router",
+         "dmaap_info": {
+            "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+         }
+      }
+   },
+   "streams_subscribes": {
+      "dmaap_subscriber": {
+         "dmaap_info": {
+            "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+         },
+         "type": "message_router"
+      }
+   }
+}
\ No newline at end of file