From f41b5dd9f129b7b4d7c5ae0ec335d71e9ba5c1a6 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Mon, 27 Jan 2020 13:02:00 +0100 Subject: [PATCH] Add publisher configuration and fix issues Change-Id: Ib3a9b99a3422f2892bc0e73b224e302d698fbb2c Signed-off-by: elinuxhenrik --- policy-agent/README.md | 28 ++++++----- .../configuration/ApplicationConfig.java | 9 +++- .../configuration/ApplicationConfigParser.java | 58 +++++++++++++++------- .../policyagent/dmaap/DmaapMessageConsumer.java | 4 +- .../dmaap/DmaapMessageConsumerImpl.java | 36 +++++++------- .../policyagent/tasks/RefreshConfigTask.java | 6 ++- .../resources/test_application_configuration.json | 8 --- ...pplication_configuration_with_dmaap_config.json | 39 +++++++++++++++ 8 files changed, 126 insertions(+), 62 deletions(-) create mode 100644 policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json diff --git a/policy-agent/README.md b/policy-agent/README.md index 3ddbbd51..6077a4b6 100644 --- a/policy-agent/README.md +++ b/policy-agent/README.md @@ -1,21 +1,25 @@ # O-RAN-SC NonRT RIC Dashboard Web Application -The O-RAN NonRT RIC PolicyAgent provides a REST API for management of -policices. It provides support for --Policy configuration. This includes - -One REST API towards all RICs in the network - -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc. - -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC --Supervision of clients (R-APPs) to eliminate stray policies in case of failure --Consistency monitoring of the SMO view of policies and the actual situation in the RICs --Consistency monitoring of RIC capabilities (policy types) +The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices. +It provides support for: + -Supervision of clients (R-APPs) to eliminate stray policies in case of failure + -Consistency monitoring of the SMO view of policies and the actual situation in the RICs + -Consistency monitoring of RIC capabilities (policy types) + -Policy configuration. This includes: + -One REST API towards all RICs in the network + -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), + all policies of a type etc. + -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC To Run Policy Agent in Local: -Create a symbolic link with below command, +In the folder /opt/app/policy-agent/config/, create a soft link with below command, ln -s application_configuration.json -The agent can be run stand alone in a simulated test mode. Then it -simulates RICs. +To Run Policy Agent in Local with the DMaaP polling turned on: +In the folder /opt/app/policy-agent/config/, create a soft link with below command, +ln -s application_configuration.json + +The agent can be run stand alone in a simulated test mode. Then it simulates RICs. The REST API is published on port 8081 and it is started by command: mvn -Dtest=MockPolicyAgent test diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index 673fe1d0..1ed3fdb2 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -42,6 +42,7 @@ public class ApplicationConfig { private Collection observers = new Vector<>(); private Map ricConfigs = new HashMap<>(); + private Properties dmaapPublisherConfig; private Properties dmaapConsumerConfig; @Autowired @@ -72,6 +73,10 @@ public class ApplicationConfig { throw new ServiceException("Could not find ric: " + ricName); } + public Properties getDmaapPublisherConfig() { + return dmaapConsumerConfig; + } + public Properties getDmaapConsumerConfig() { return dmaapConsumerConfig; } @@ -98,7 +103,8 @@ public class ApplicationConfig { } } - public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapConsumerConfig) { + public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapPublisherConfig, + Properties dmaapConsumerConfig) { Collection notifications = new Vector<>(); synchronized (this) { Map newRicConfigs = new HashMap<>(); @@ -123,6 +129,7 @@ public class ApplicationConfig { } notifyObservers(notifications); + this.dmaapPublisherConfig = dmaapPublisherConfig; this.dmaapConsumerConfig = dmaapConsumerConfig; } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index 530ac98b..d32db7a0 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -35,7 +35,9 @@ import java.util.Vector; import javax.validation.constraints.NotNull; +import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; +import org.springframework.http.MediaType; public class ApplicationConfigParser { @@ -46,6 +48,7 @@ public class ApplicationConfigParser { .create(); // private Vector ricConfig; + private Properties dmaapPublisherConfig; private Properties dmaapConsumerConfig; public ApplicationConfigParser() { @@ -54,14 +57,28 @@ public class ApplicationConfigParser { public void parse(JsonObject root) throws ServiceException { JsonObject ricConfigJson = root.getAsJsonObject(CONFIG); ricConfig = parseRics(ricConfigJson); - JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes"); - dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson); + JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes"); + if (dmaapPublisherConfigJson == null) { + dmaapPublisherConfig = new Properties(); + } else { + dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson); + } + JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes"); + if (dmaapConsumerConfigJson == null) { + dmaapConsumerConfig = new Properties(); + } else { + dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson); + } } public Vector getRicConfigs() { return this.ricConfig; } + public Properties getDmaapPublisherConfig() { + return dmaapPublisherConfig; + } + public Properties getDmaapConsumerConfig() { return dmaapConsumerConfig; } @@ -86,7 +103,7 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsJsonArray(); } - private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException { + private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException { Set> topics = consumerCfg.entrySet(); if (topics.size() != 1) { throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics); @@ -106,17 +123,20 @@ public class ApplicationConfigParser { passwd = userInfo[1]; } String urlPath = url.getPath(); - DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + DmaapUrlPath path = parseDmaapUrlPath(urlPath); - dmaapProps.put("port", url.getPort()); - dmaapProps.put("server", url.getHost()); + dmaapProps.put("ServiceName", url.getHost()); dmaapProps.put("topic", path.dmaapTopicName); - dmaapProps.put("consumerGroup", path.consumerGroup); - dmaapProps.put("consumerInstance", path.consumerId); - dmaapProps.put("fetchTimeout", 15000); - dmaapProps.put("fetchLimit", 1000); + dmaapProps.put("host", url.getHost()); + dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString()); dmaapProps.put("userName", userName); dmaapProps.put("password", passwd); + dmaapProps.put("group", path.consumerGroup); + dmaapProps.put("id", path.consumerId); + dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString()); + dmaapProps.put("timeout", 15000); + dmaapProps.put("limit", 1000); + dmaapProps.put("port", url.getPort()); } catch (MalformedURLException e) { throw new ServiceException("Could not parse the URL", e); } @@ -128,27 +148,31 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsString(); } - private class DmaapConsumerUrlPath { + private class DmaapUrlPath { final String dmaapTopicName; final String consumerGroup; final String consumerId; - DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { this.dmaapTopicName = dmaapTopicName; this.consumerGroup = consumerGroup; this.consumerId = consumerId; } } - private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { + private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1 - if (tokens.length != 5) { + if (!(tokens.length == 3 ^ tokens.length == 5)) { throw new ServiceException("The path has incorrect syntax: " + urlPath); } final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P - final String consumerGroup = tokens[3]; // users - final String consumerId = tokens[4]; // sdnc1 - return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + String consumerGroup = ""; // users + String consumerId = ""; // sdnc1 + if (tokens.length == 5) { + consumerGroup = tokens[3]; + consumerId = tokens[4]; + } + return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index fd421048..889dfafa 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -21,8 +21,6 @@ package org.oransc.policyagent.dmaap; -import java.util.Properties; - /** * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface * @@ -34,7 +32,7 @@ public interface DmaapMessageConsumer { * * @param properties */ - public void init(Properties properties); + public void init(); /** * This method process the message and call the respective Controller diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 74cfe0da..191a13b2 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap; import java.io.IOException; import java.util.Properties; + import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -40,19 +41,22 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class); private boolean alive = false; + private final ApplicationConfig applicationConfig; protected MRConsumer consumer; private MRConsumerResponse response = null; @Autowired public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { - Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig(); - init(dmaapConsumerConfig); + this.applicationConfig = applicationConfig; } @Scheduled(fixedRate = 1000 * 60) @Override public void run() { - while (this.alive) { + if (!alive) { + init(); + } + if (this.alive) { try { Iterable dmaapMsgs = fetchAllMessages(); for (String msg : dmaapMsgs) { @@ -72,30 +76,24 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage()); if (!"200".equals(response.getResponseCode())) { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); } } return response.getActualMessages(); } @Override - public void init(Properties properties) { - // Initialize the DMAAP with the properties + public void init() { + Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); + Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); + // No need to start if there is no configuration. + if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0 + || dmaapPublisherProperties.size() == 0) { + return; + } // Do we need to do any validation of properties before calling the factory? - Properties prop = new Properties(); - prop.setProperty("ServiceName", "localhost:6845/events"); - prop.setProperty("topic", "A1-P"); - prop.setProperty("host", "localhost:6845"); - prop.setProperty("contenttype", "application/json"); - prop.setProperty("username", "admin"); - prop.setProperty("password", "admin"); - prop.setProperty("group", "users"); - prop.setProperty("id", "policy-agent"); - prop.setProperty("TransportType", "HTTPNOAUTH"); - prop.setProperty("timeout", "15000"); - prop.setProperty("limit", "1000"); try { - consumer = MRClientFactory.createConsumer(prop); + consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { logger.error("Exception occurred while creating Dmaap Consumer", e); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index bc43edab..1ab5fc9c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -125,7 +125,8 @@ public class RefreshConfigTask { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); - this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig()); + this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(), + parser.getDmaapConsumerConfig()); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -152,7 +153,8 @@ public class RefreshConfigTask { } ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig()); + appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(), + appParser.getDmaapConsumerConfig()); logger.info("Local configuration file loaded: {}", filepath); } catch (JsonSyntaxException | ServiceException | IOException e) { logger.trace("Local configuration file not loaded: {}", filepath, e); diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json index c63b7104..446c0611 100644 --- a/policy-agent/src/test/resources/test_application_configuration.json +++ b/policy-agent/src/test/resources/test_application_configuration.json @@ -19,13 +19,5 @@ ] } ] - }, - "streams_subscribes": { - "dmaap_subscriber": { - "dmaap_info": { - "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1" - }, - "type": "message_router" - } } } \ No newline at end of file diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json new file mode 100644 index 00000000..d4a39728 --- /dev/null +++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json @@ -0,0 +1,39 @@ +{ + "config": { + "//description": "Application configuration", + "ric": [ + { + "name": "ric1", + "baseUrl": "http://localhost:8080/", + "managedElementIds": [ + "kista_1", + "kista_2" + ] + }, + { + "name": "ric2", + "baseUrl": "http://localhost:8081/", + "managedElementIds": [ + "kista_3", + "kista_4" + ] + } + ] + }, + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "dmaap_info": { + "topic_url": "https://dradmin:dradmin@localhost:2222/events/A1-P-RESULT" + } + } + }, + "streams_subscribes": { + "dmaap_subscriber": { + "dmaap_info": { + "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1" + }, + "type": "message_router" + } + } +} \ No newline at end of file -- 2.16.6