Add publisher configuration and fix issues 41/2341/2
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 27 Jan 2020 12:02:00 +0000 (13:02 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 27 Jan 2020 13:39:30 +0000 (14:39 +0100)
Change-Id: Ib3a9b99a3422f2892bc0e73b224e302d698fbb2c
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
policy-agent/README.md
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/test/resources/test_application_configuration.json
policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json [new file with mode: 0644]

index 3ddbbd5..6077a4b 100644 (file)
@@ -1,21 +1,25 @@
 # O-RAN-SC NonRT RIC Dashboard Web Application
 
-The O-RAN NonRT RIC PolicyAgent provides a REST API for management of 
-policices. It provides support for 
--Policy configuration. This includes
- -One REST API towards all RICs in the network
- -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
- -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC 
--Supervision of clients (R-APPs) to eliminate stray policies in case of failure
--Consistency monitoring of the SMO view of policies and the actual situation in the RICs
--Consistency monitoring of RIC capabilities (policy types)
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices.
+It provides support for:
+ -Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+ -Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+ -Consistency monitoring of RIC capabilities (policy types)
+ -Policy configuration. This includes:
+  -One REST API towards all RICs in the network
+  -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP),
+   all policies of a type etc.
+  -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
 
 To Run Policy Agent in Local:
-Create a symbolic link with below command,
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
 ln -s <path to test_application_configuration.json> application_configuration.json
 
-The agent can be run stand alone in a simulated test mode. Then it 
-simulates RICs. 
+To Run Policy Agent in Local with the DMaaP polling turned on:
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
+ln -s <path to test_application_configuration_with_dmaap_config.json> application_configuration.json
+
+The agent can be run stand alone in a simulated test mode. Then it simulates RICs.
 The REST API is published on port 8081 and it is started by command:
 mvn -Dtest=MockPolicyAgent test
 
index 673fe1d..1ed3fdb 100644 (file)
@@ -42,6 +42,7 @@ public class ApplicationConfig {
 
     private Collection<Observer> observers = new Vector<>();
     private Map<String, RicConfig> ricConfigs = new HashMap<>();
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     @Autowired
@@ -72,6 +73,10 @@ public class ApplicationConfig {
         throw new ServiceException("Could not find ric: " + ricName);
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapConsumerConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -98,7 +103,8 @@ public class ApplicationConfig {
         }
     }
 
-    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapConsumerConfig) {
+    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapPublisherConfig,
+        Properties dmaapConsumerConfig) {
         Collection<Notification> notifications = new Vector<>();
         synchronized (this) {
             Map<String, RicConfig> newRicConfigs = new HashMap<>();
@@ -123,6 +129,7 @@ public class ApplicationConfig {
         }
         notifyObservers(notifications);
 
+        this.dmaapPublisherConfig = dmaapPublisherConfig;
         this.dmaapConsumerConfig = dmaapConsumerConfig;
     }
 
index 530ac98..d32db7a 100644 (file)
@@ -35,7 +35,9 @@ import java.util.Vector;
 
 import javax.validation.constraints.NotNull;
 
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.springframework.http.MediaType;
 
 public class ApplicationConfigParser {
 
@@ -46,6 +48,7 @@ public class ApplicationConfigParser {
         .create(); //
 
     private Vector<RicConfig> ricConfig;
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     public ApplicationConfigParser() {
@@ -54,14 +57,28 @@ public class ApplicationConfigParser {
     public void parse(JsonObject root) throws ServiceException {
         JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
         ricConfig = parseRics(ricConfigJson);
-        JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
-        dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
+        JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes");
+        if (dmaapPublisherConfigJson == null) {
+            dmaapPublisherConfig = new Properties();
+        } else {
+            dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson);
+        }
+        JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes");
+        if (dmaapConsumerConfigJson == null) {
+            dmaapConsumerConfig = new Properties();
+        } else {
+            dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson);
+        }
     }
 
     public Vector<RicConfig> getRicConfigs() {
         return this.ricConfig;
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapPublisherConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -86,7 +103,7 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsJsonArray();
     }
 
-    private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+    private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException {
         Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
         if (topics.size() != 1) {
             throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
@@ -106,17 +123,20 @@ public class ApplicationConfigParser {
                 passwd = userInfo[1];
             }
             String urlPath = url.getPath();
-            DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+            DmaapUrlPath path = parseDmaapUrlPath(urlPath);
 
-            dmaapProps.put("port", url.getPort());
-            dmaapProps.put("server", url.getHost());
+            dmaapProps.put("ServiceName", url.getHost());
             dmaapProps.put("topic", path.dmaapTopicName);
-            dmaapProps.put("consumerGroup", path.consumerGroup);
-            dmaapProps.put("consumerInstance", path.consumerId);
-            dmaapProps.put("fetchTimeout", 15000);
-            dmaapProps.put("fetchLimit", 1000);
+            dmaapProps.put("host", url.getHost());
+            dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
             dmaapProps.put("userName", userName);
             dmaapProps.put("password", passwd);
+            dmaapProps.put("group", path.consumerGroup);
+            dmaapProps.put("id", path.consumerId);
+            dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
+            dmaapProps.put("timeout", 15000);
+            dmaapProps.put("limit", 1000);
+            dmaapProps.put("port", url.getPort());
         } catch (MalformedURLException e) {
             throw new ServiceException("Could not parse the URL", e);
         }
@@ -128,27 +148,31 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsString();
     }
 
-    private class DmaapConsumerUrlPath {
+    private class DmaapUrlPath {
         final String dmaapTopicName;
         final String consumerGroup;
         final String consumerId;
 
-        DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+        DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
             this.dmaapTopicName = dmaapTopicName;
             this.consumerGroup = consumerGroup;
             this.consumerId = consumerId;
         }
     }
 
-    private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+    private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
         String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
-        if (tokens.length != 5) {
+        if (!(tokens.length == 3 ^ tokens.length == 5)) {
             throw new ServiceException("The path has incorrect syntax: " + urlPath);
         }
 
         final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
-        final String consumerGroup = tokens[3]; // users
-        final String consumerId = tokens[4]; // sdnc1
-        return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+        String consumerGroup = ""; // users
+        String consumerId = ""; // sdnc1
+        if (tokens.length == 5) {
+            consumerGroup = tokens[3];
+            consumerId = tokens[4];
+        }
+        return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
     }
 }
index fd42104..889dfaf 100644 (file)
@@ -21,8 +21,6 @@
 
 package org.oransc.policyagent.dmaap;
 
-import java.util.Properties;
-
 /**
  * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
  *
@@ -34,7 +32,7 @@ public interface DmaapMessageConsumer {
      *
      * @param properties
      */
-    public void init(Properties properties);
+    public void init();
 
     /**
      * This method process the message and call the respective Controller
index 74cfe0d..191a13b 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap;
 
 import java.io.IOException;
 import java.util.Properties;
+
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -40,19 +41,22 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
 
     private boolean alive = false;
+    private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRConsumerResponse response = null;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
-        init(dmaapConsumerConfig);
+        this.applicationConfig = applicationConfig;
     }
 
     @Scheduled(fixedRate = 1000 * 60)
     @Override
     public void run() {
-        while (this.alive) {
+        if (!alive) {
+            init();
+        }
+        if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
                 for (String msg : dmaapMsgs) {
@@ -72,30 +76,24 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
             if (!"200".equals(response.getResponseCode())) {
                 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
-                        response.getResponseMessage());
+                    response.getResponseMessage());
             }
         }
         return response.getActualMessages();
     }
 
     @Override
-    public void init(Properties properties) {
-        // Initialize the DMAAP with the properties
+    public void init() {
+        Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+        // No need to start if there is no configuration.
+        if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+            || dmaapPublisherProperties.size() == 0) {
+            return;
+        }
         // Do we need to do any validation of properties before calling the factory?
-        Properties prop = new Properties();
-        prop.setProperty("ServiceName", "localhost:6845/events");
-        prop.setProperty("topic", "A1-P");
-        prop.setProperty("host", "localhost:6845");
-        prop.setProperty("contenttype", "application/json");
-        prop.setProperty("username", "admin");
-        prop.setProperty("password", "admin");
-        prop.setProperty("group", "users");
-        prop.setProperty("id", "policy-agent");
-        prop.setProperty("TransportType", "HTTPNOAUTH");
-        prop.setProperty("timeout", "15000");
-        prop.setProperty("limit", "1000");
         try {
-            consumer = MRClientFactory.createConsumer(prop);
+            consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
             logger.error("Exception occurred while creating Dmaap Consumer", e);
index bc43eda..1ab5fc9 100644 (file)
@@ -125,7 +125,8 @@ public class RefreshConfigTask {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             parser.parse(jsonObject);
-            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig());
+            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
+                parser.getDmaapConsumerConfig());
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
         }
@@ -152,7 +153,8 @@ public class RefreshConfigTask {
             }
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig());
+            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
+                appParser.getDmaapConsumerConfig());
             logger.info("Local configuration file loaded: {}", filepath);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
             logger.trace("Local configuration file not loaded: {}", filepath, e);
index c63b710..446c061 100644 (file)
             ]
          }
       ]
-   },
-   "streams_subscribes": {
-      "dmaap_subscriber": {
-         "dmaap_info": {
-            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
-         },
-         "type": "message_router"
-      }
    }
 }
\ No newline at end of file
diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
new file mode 100644 (file)
index 0000000..d4a3972
--- /dev/null
@@ -0,0 +1,39 @@
+{
+   "config": {
+      "//description": "Application configuration",
+      "ric": [
+         {
+            "name": "ric1",
+            "baseUrl": "http://localhost:8080/",
+            "managedElementIds": [
+               "kista_1",
+               "kista_2"
+            ]
+         },
+         {
+            "name": "ric2",
+            "baseUrl": "http://localhost:8081/",
+            "managedElementIds": [
+               "kista_3",
+               "kista_4"
+            ]
+         }
+      ]
+   },
+   "streams_publishes": {
+      "dmaap_publisher": {
+         "type": "message_router",
+         "dmaap_info": {
+            "topic_url": "https://dradmin:dradmin@localhost:2222/events/A1-P-RESULT"
+         }
+      }
+   },
+   "streams_subscribes": {
+      "dmaap_subscriber": {
+         "dmaap_info": {
+            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
+         },
+         "type": "message_router"
+      }
+   }
+}
\ No newline at end of file