From: Henrik Andersson Date: Tue, 28 Jan 2020 08:46:50 +0000 (+0000) Subject: Merge "Adapt A1 controller to latest A1 spec" X-Git-Tag: 1.0.1~23 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=f3461cb776023b950d62edd25eca148b6d354c9c;hp=d1623c5066ebb6152c6a2ba0fe889e32c75d8890;p=nonrtric.git Merge "Adapt A1 controller to latest A1 spec" --- diff --git a/policy-agent/README.md b/policy-agent/README.md index 3ddbbd51..6077a4b6 100644 --- a/policy-agent/README.md +++ b/policy-agent/README.md @@ -1,21 +1,25 @@ # O-RAN-SC NonRT RIC Dashboard Web Application -The O-RAN NonRT RIC PolicyAgent provides a REST API for management of -policices. It provides support for --Policy configuration. This includes - -One REST API towards all RICs in the network - -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc. - -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC --Supervision of clients (R-APPs) to eliminate stray policies in case of failure --Consistency monitoring of the SMO view of policies and the actual situation in the RICs --Consistency monitoring of RIC capabilities (policy types) +The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices. +It provides support for: + -Supervision of clients (R-APPs) to eliminate stray policies in case of failure + -Consistency monitoring of the SMO view of policies and the actual situation in the RICs + -Consistency monitoring of RIC capabilities (policy types) + -Policy configuration. This includes: + -One REST API towards all RICs in the network + -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), + all policies of a type etc. + -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC To Run Policy Agent in Local: -Create a symbolic link with below command, +In the folder /opt/app/policy-agent/config/, create a soft link with below command, ln -s application_configuration.json -The agent can be run stand alone in a simulated test mode. Then it -simulates RICs. +To Run Policy Agent in Local with the DMaaP polling turned on: +In the folder /opt/app/policy-agent/config/, create a soft link with below command, +ln -s application_configuration.json + +The agent can be run stand alone in a simulated test mode. Then it simulates RICs. The REST API is published on port 8081 and it is started by command: mvn -Dtest=MockPolicyAgent test diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml index 43d3aefa..f2a9411f 100644 --- a/policy-agent/pom.xml +++ b/policy-agent/pom.xml @@ -128,6 +128,11 @@ dmaap-client ${sdk.version} + + org.projectlombok + lombok + provided + io.springfox diff --git a/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java b/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java index f0826e9a..e05eb95f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java @@ -20,6 +20,7 @@ package org.oransc.policyagent; +import com.fasterxml.jackson.databind.ObjectMapper; import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.repository.Policies; @@ -61,4 +62,9 @@ class BeanFactory { return new A1ClientFactory(); } + @Bean + public ObjectMapper mapper() { + return new ObjectMapper(); + } + } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java index 4c1d140a..1c40308a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java @@ -50,31 +50,28 @@ public class StdA1Client implements A1Client { } @Override - public Mono> getPolicyTypeIdentities() { - return restClient.get("/policytypes/identities") // + public Mono> getPolicyIdentities() { + return restClient.get("/policies") // .flatMap(this::parseJsonArrayOfString); } @Override - public Mono> getPolicyIdentities() { - return restClient.get("/policies/identities") // - .flatMap(this::parseJsonArrayOfString); + public Mono putPolicy(Policy policy) { + String url = "/policies/" + policy.id() + "?policyTypeId=" + policy.type().name(); + return restClient.put(url, policy.json()) // + .flatMap(this::validateJson); } @Override - public Mono getPolicyTypeSchema(String policyTypeId) { - Mono response = restClient.get("/policytypes/" + policyTypeId); - return response.flatMap(this::createMono); + public Mono> getPolicyTypeIdentities() { + return restClient.get("/policytypes") // + .flatMap(this::parseJsonArrayOfString); } @Override - public Mono putPolicy(Policy policy) { - // TODO update when simulator is updated to include policy type - // Mono response = client.put("/policies/" + policy.id() + "?policyTypeId=" + policy.type().name(), - // policy.json()); - Mono response = restClient.put("/policies/" + policy.id(), policy.json()); - - return response.flatMap(this::createMono); + public Mono getPolicyTypeSchema(String policyTypeId) { + return restClient.get("/policytypes/" + policyTypeId) // + .flatMap(this::extractPolicySchema); } @Override @@ -113,12 +110,21 @@ public class StdA1Client implements A1Client { } } - private Mono createMono(String inputString) { + private Mono extractPolicySchema(String inputString) { try { JSONObject jsonObject = new JSONObject(inputString); - String jsonString = jsonObject.toString(); - logger.debug("A1 client: received string = {}", jsonString); - return Mono.just(jsonString); + JSONObject schemaObject = jsonObject.getJSONObject("policySchema"); + String schemaString = schemaObject.toString(); + return Mono.just(schemaString); + } catch (JSONException ex) { // invalid json + return Mono.error(ex); + } + } + + private Mono validateJson(String inputString) { + try { + new JSONObject(inputString); + return Mono.just(inputString); } catch (JSONException ex) { // invalid json return Mono.error(ex); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index 673fe1d0..1ed3fdb2 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -42,6 +42,7 @@ public class ApplicationConfig { private Collection observers = new Vector<>(); private Map ricConfigs = new HashMap<>(); + private Properties dmaapPublisherConfig; private Properties dmaapConsumerConfig; @Autowired @@ -72,6 +73,10 @@ public class ApplicationConfig { throw new ServiceException("Could not find ric: " + ricName); } + public Properties getDmaapPublisherConfig() { + return dmaapConsumerConfig; + } + public Properties getDmaapConsumerConfig() { return dmaapConsumerConfig; } @@ -98,7 +103,8 @@ public class ApplicationConfig { } } - public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapConsumerConfig) { + public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapPublisherConfig, + Properties dmaapConsumerConfig) { Collection notifications = new Vector<>(); synchronized (this) { Map newRicConfigs = new HashMap<>(); @@ -123,6 +129,7 @@ public class ApplicationConfig { } notifyObservers(notifications); + this.dmaapPublisherConfig = dmaapPublisherConfig; this.dmaapConsumerConfig = dmaapConsumerConfig; } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index 530ac98b..34d1d97a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -25,17 +25,16 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; - import java.net.MalformedURLException; import java.net.URL; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Vector; - import javax.validation.constraints.NotNull; - +import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; +import org.springframework.http.MediaType; public class ApplicationConfigParser { @@ -46,6 +45,7 @@ public class ApplicationConfigParser { .create(); // private Vector ricConfig; + private Properties dmaapPublisherConfig; private Properties dmaapConsumerConfig; public ApplicationConfigParser() { @@ -54,14 +54,28 @@ public class ApplicationConfigParser { public void parse(JsonObject root) throws ServiceException { JsonObject ricConfigJson = root.getAsJsonObject(CONFIG); ricConfig = parseRics(ricConfigJson); - JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes"); - dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson); + JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes"); + if (dmaapPublisherConfigJson == null) { + dmaapPublisherConfig = new Properties(); + } else { + dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson); + } + JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes"); + if (dmaapConsumerConfigJson == null) { + dmaapConsumerConfig = new Properties(); + } else { + dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson); + } } public Vector getRicConfigs() { return this.ricConfig; } + public Properties getDmaapPublisherConfig() { + return dmaapPublisherConfig; + } + public Properties getDmaapConsumerConfig() { return dmaapConsumerConfig; } @@ -86,7 +100,7 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsJsonArray(); } - private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException { + private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException { Set> topics = consumerCfg.entrySet(); if (topics.size() != 1) { throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics); @@ -106,17 +120,19 @@ public class ApplicationConfigParser { passwd = userInfo[1]; } String urlPath = url.getPath(); - DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + DmaapUrlPath path = parseDmaapUrlPath(urlPath); - dmaapProps.put("port", url.getPort()); - dmaapProps.put("server", url.getHost()); + dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events"); dmaapProps.put("topic", path.dmaapTopicName); - dmaapProps.put("consumerGroup", path.consumerGroup); - dmaapProps.put("consumerInstance", path.consumerId); - dmaapProps.put("fetchTimeout", 15000); - dmaapProps.put("fetchLimit", 1000); + dmaapProps.put("host", url.getHost()+":"+url.getPort()); + dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString()); dmaapProps.put("userName", userName); dmaapProps.put("password", passwd); + dmaapProps.put("group", path.consumerGroup); + dmaapProps.put("id", path.consumerId); + dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString()); + dmaapProps.put("timeout", 15000); + dmaapProps.put("limit", 1000); } catch (MalformedURLException e) { throw new ServiceException("Could not parse the URL", e); } @@ -128,27 +144,31 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsString(); } - private class DmaapConsumerUrlPath { + private class DmaapUrlPath { final String dmaapTopicName; final String consumerGroup; final String consumerId; - DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { this.dmaapTopicName = dmaapTopicName; this.consumerGroup = consumerGroup; this.consumerId = consumerId; } } - private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { + private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1 - if (tokens.length != 5) { + if (!(tokens.length == 3 ^ tokens.length == 5)) { throw new ServiceException("The path has incorrect syntax: " + urlPath); } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P - final String consumerGroup = tokens[3]; // users - final String consumerId = tokens[4]; // sdnc1 - return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + final String dmaapTopicName = tokens[2]; // /events/A1-P + String consumerGroup = ""; // users + String consumerId = ""; // sdnc1 + if (tokens.length == 5) { + consumerGroup = tokens[3]; + consumerId = tokens[4]; + } + return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java new file mode 100644 index 00000000..085320c9 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java @@ -0,0 +1,44 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.configuration; + +import java.util.concurrent.Executor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfiguration implements AsyncConfigurer { + + @Override + @Bean(name = "threadPoolTaskExecutor") + public Executor getAsyncExecutor() { + //Set this configuration value from common properties file + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(25); + return executor; + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index fd421048..889dfafa 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -21,8 +21,6 @@ package org.oransc.policyagent.dmaap; -import java.util.Properties; - /** * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface * @@ -34,7 +32,7 @@ public interface DmaapMessageConsumer { * * @param properties */ - public void init(Properties properties); + public void init(); /** * This method process the message and call the respective Controller diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 74cfe0da..2ae5e5ee 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap; import java.io.IOException; import java.util.Properties; +import javax.annotation.PostConstruct; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -40,21 +41,27 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class); private boolean alive = false; + private final ApplicationConfig applicationConfig; protected MRConsumer consumer; private MRConsumerResponse response = null; + @Autowired + private DmaapMessageHandler dmaapMessageHandler; @Autowired public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { - Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig(); - init(dmaapConsumerConfig); + this.applicationConfig = applicationConfig; } - @Scheduled(fixedRate = 1000 * 60) + @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000) @Override public void run() { - while (this.alive) { + /* + * if (!alive) { init(); } + */ + if (this.alive) { try { Iterable dmaapMsgs = fetchAllMessages(); + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); for (String msg : dmaapMsgs) { processMsg(msg); } @@ -78,24 +85,20 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { return response.getActualMessages(); } + @PostConstruct @Override - public void init(Properties properties) { - // Initialize the DMAAP with the properties - // Do we need to do any validation of properties before calling the factory? - Properties prop = new Properties(); - prop.setProperty("ServiceName", "localhost:6845/events"); - prop.setProperty("topic", "A1-P"); - prop.setProperty("host", "localhost:6845"); - prop.setProperty("contenttype", "application/json"); - prop.setProperty("username", "admin"); - prop.setProperty("password", "admin"); - prop.setProperty("group", "users"); - prop.setProperty("id", "policy-agent"); - prop.setProperty("TransportType", "HTTPNOAUTH"); - prop.setProperty("timeout", "15000"); - prop.setProperty("limit", "1000"); + public void init() { + Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); + Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); + // No need to start if there is no configuration. + if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0 + || dmaapPublisherProperties.size() == 0) { + logger.error("DMaaP properties Failed to Load"); + return; + } try { - consumer = MRClientFactory.createConsumer(prop); + logger.debug("Creating DMAAP Client"); + consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { logger.error("Exception occurred while creating Dmaap Consumer", e); @@ -104,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { @Override public void processMsg(String msg) throws Exception { - System.out.println("sysout" + msg); + logger.debug("Message Reveived from DMAAP : {}", msg); // Call the concurrent Task executor to handle the incoming request - // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP - // through REST CLIENT - // Call the Controller with the extracted payload + dmaapMessageHandler.handleDmaapMsg(msg); } @Override diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java new file mode 100644 index 00000000..bf9f06c1 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -0,0 +1,175 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.dmaap; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.oransc.policyagent.clients.AsyncRestClient; +import org.oransc.policyagent.configuration.ApplicationConfig; +import org.oransc.policyagent.controllers.PolicyController; +import org.oransc.policyagent.model.DmaapMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Component +public class DmaapMessageHandler { + + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); + + @Autowired + private ObjectMapper mapper; + @Autowired + private PolicyController policyController; + private AsyncRestClient restClient; + private ApplicationConfig applicationConfig; + private String topic = ""; + + @Autowired + public DmaapMessageHandler(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + } + + // The publish properties is corrupted. It contains the subscribe property values. + @Async("threadPoolTaskExecutor") + public void handleDmaapMsg(String msg) { + init(); + DmaapMessage dmaapMessage = null; + ResponseEntity response = null; + // Process the message + /** + * Sample Request Message from DMAAP { "type": "request", "correlationId": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", + * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", + * "payload": "{\"ric\":\"ric1\"}" } + * -------------------------------------------------------------------------------------------------------------- + * Sample Response Message to DMAAP { "type": "response", "correlation-id": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": + * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" } + * ------------------------------------------------------------------------------------------------------------- + * Sample Response Message to DMAAP { "type": "response", "correlation-id": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": + * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" } + */ + try { + dmaapMessage = mapper.readValue(msg, DmaapMessage.class); + // Post the accepted message to the DMAAP bus + logger.debug("DMAAP Message- {}", dmaapMessage); + logger.debug("Post Accepted Message to Client"); + restClient + .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), + dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY)) + .block(); // + // Call the Controller + logger.debug("Invoke the Policy Agent Controller"); + response = invokeController(dmaapMessage); + // Post the Response message to the DMAAP bus + logger.debug("DMAAP Response Message to Client- {}", response); + restClient + .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), + dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody())) + .block(); // + } catch (IOException e) { + logger.error("Exception occured during message processing", e); + } + } + + private ResponseEntity invokeController(DmaapMessage dmaapMessage) { + String formattedString = ""; + String ricName; + String instance; + logger.debug("Payload from the Message - {}", dmaapMessage.getPayload()); + try { + formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString(); + logger.debug("Removed the Escape charater in payload- {}", formattedString); + } catch (JSONException e) { + logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload()); + } + JSONObject jsonObject = new JSONObject(formattedString); + switch (dmaapMessage.getOperation()) { + case "getPolicySchemas": + ricName = (String) jsonObject.get("ricName"); + logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); + return policyController.getPolicySchemas(ricName); + case "getPolicySchema": + String policyTypeId = (String) jsonObject.get("id"); + logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); + System.out.println("policyTypeId" + policyTypeId); + return policyController.getPolicySchema(policyTypeId); + case "getPolicyTypes": + ricName = (String) jsonObject.get("ricName"); + logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); + return policyController.getPolicyTypes(ricName); + case "getPolicy": + instance = (String) jsonObject.get("instance"); + logger.debug("Received the request for getPolicy with Instance- {}", instance); + return policyController.getPolicy(instance); + case "deletePolicy": + instance = (String) jsonObject.get("instance"); + logger.debug("Received the request for deletePolicy with Instance- {}", instance); + return null;// policyController.deletePolicy(deleteInstance); + case "putPolicy": + String type = (String) jsonObject.get("type"); + String putPolicyInstance = (String) jsonObject.get("instance"); + String putPolicyRic = (String) jsonObject.get("ric"); + String service = (String) jsonObject.get("service"); + String jsonBody = (String) jsonObject.get("jsonBody"); + return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody); + case "getPolicies": + String getPolicyType = (String) jsonObject.get("type"); + String getPolicyRic = (String) jsonObject.get("ric"); + String getPolicyService = (String) jsonObject.get("service"); + return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService); + default: + break; + } + return null; + } + + private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status, + String message) { + System.out.println("buildResponse "); + return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "") + .put("originatorId", originatorId).put("requestId", requestId).put("status", status) + .put("message", message).toString(); + } + + // @PostConstruct + // The application properties value is always NULL for the first time + // Need to fix this + public void init() { + logger.debug("Reading DMAAP Publisher bus details from Application Config"); + Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); + String host = (String) dmaapPublisherConfig.get("ServiceName"); + topic = dmaapPublisherConfig.getProperty("topic"); + logger.debug("Read the topic & Service Name - {} , {}", host, topic); + this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config + + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java new file mode 100644 index 00000000..e56f4b4a --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java @@ -0,0 +1,47 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.model; + +import java.sql.Timestamp; +import javax.validation.constraints.NotNull; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class DmaapMessage { + + @NotNull + private String type; + @NotNull + private String correlationId; + @NotNull + private String target; + @NotNull + private Timestamp timestamp; + private String apiVersion; + @NotNull + private String originatorId; + private String requestId; + @NotNull + private String operation; + private String payload; +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index bc43edab..1ab5fc9c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -125,7 +125,8 @@ public class RefreshConfigTask { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); - this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig()); + this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(), + parser.getDmaapConsumerConfig()); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -152,7 +153,8 @@ public class RefreshConfigTask { } ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig()); + appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(), + appParser.getDmaapConsumerConfig()); logger.info("Local configuration file loaded: {}", filepath); } catch (JsonSyntaxException | ServiceException | IOException e) { logger.trace("Local configuration file not loaded: {}", filepath, e); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java b/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java index 1454ccaf..2c83e5a8 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java @@ -21,6 +21,7 @@ package org.oransc.policyagent.clients; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -51,8 +52,8 @@ import reactor.test.StepVerifier; @RunWith(MockitoJUnitRunner.class) public class StdA1ClientTest { private static final String RIC_URL = "RicUrl"; - private static final String POLICYTYPES_IDENTITIES_URL = "/policytypes/identities"; - private static final String POLICIES_IDENTITIES_URL = "/policies/identities"; + private static final String POLICYTYPES_IDENTITIES_URL = "/policytypes"; + private static final String POLICIES_IDENTITIES_URL = "/policies"; private static final String POLICYTYPES_URL = "/policytypes/"; private static final String POLICIES_URL = "/policies/"; @@ -98,8 +99,10 @@ public class StdA1ClientTest { @Test public void testGetValidPolicyType() { - when(asyncRestClientMock.get(POLICYTYPES_URL + POLICY_TYPE_1_NAME)) - .thenReturn(Mono.just(POLICY_TYPE_SCHEMA_VALID)); + Mono policyTypeResp = + Mono.just("{\"policySchema\": " + POLICY_TYPE_SCHEMA_VALID + ", \"statusSchema\": {} }"); + + doReturn(policyTypeResp).when(asyncRestClientMock).get(POLICYTYPES_URL + POLICY_TYPE_1_NAME); Mono policyTypeMono = a1Client.getPolicyTypeSchema(POLICY_TYPE_1_NAME); verify(asyncRestClientMock).get(POLICYTYPES_URL + POLICY_TYPE_1_NAME); @@ -123,7 +126,7 @@ public class StdA1ClientTest { Mono policyMono = a1Client.putPolicy(createPolicy(RIC_URL, POLICY_1_ID, POLICY_JSON_VALID, POLICY_TYPE)); - verify(asyncRestClientMock).put(POLICIES_URL + POLICY_1_ID, POLICY_JSON_VALID); + verify(asyncRestClientMock).put(POLICIES_URL + POLICY_1_ID + "?policyTypeId=" + POLICY_TYPE, POLICY_JSON_VALID); StepVerifier.create(policyMono).expectNext(POLICY_JSON_VALID).expectComplete().verify(); } diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json index c63b7104..446c0611 100644 --- a/policy-agent/src/test/resources/test_application_configuration.json +++ b/policy-agent/src/test/resources/test_application_configuration.json @@ -19,13 +19,5 @@ ] } ] - }, - "streams_subscribes": { - "dmaap_subscriber": { - "dmaap_info": { - "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1" - }, - "type": "message_router" - } } } \ No newline at end of file diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json new file mode 100644 index 00000000..c9453603 --- /dev/null +++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json @@ -0,0 +1,39 @@ +{ + "config": { + "//description": "Application configuration", + "ric": [ + { + "name": "ric1", + "baseUrl": "http://localhost:8080/", + "managedElementIds": [ + "kista_1", + "kista_2" + ] + }, + { + "name": "ric2", + "baseUrl": "http://localhost:8081/", + "managedElementIds": [ + "kista_3", + "kista_4" + ] + } + ] + }, + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" + } + } + }, + "streams_subscribes": { + "dmaap_subscriber": { + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent" + }, + "type": "message_router" + } + } +} \ No newline at end of file