From: Henrik Andersson Date: Tue, 28 Jan 2020 08:46:37 +0000 (+0000) Subject: Merge "Adapted to latest STD A1 API spec" X-Git-Tag: 1.0.1~24 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=d1b7bb42fe20c8c0131959f3e96668d8e6669d04;hp=85cffcf640063802689baf6abb2224f504520288;p=nonrtric.git Merge "Adapted to latest STD A1 API spec" --- diff --git a/dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApiImpl.java b/dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApiImpl.java index c661e661..19865e18 100644 --- a/dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApiImpl.java +++ b/dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApiImpl.java @@ -55,14 +55,14 @@ public class PolicyAgentApiImpl implements PolicyAgentApi { RestTemplate restTemplate = new RestTemplate(); private static com.google.gson.Gson gson = new GsonBuilder() // - .serializeNulls() // - .create(); // + .serializeNulls() // + .create(); // private final String urlPrefix; @Autowired public PolicyAgentApiImpl( - @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) { + @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) { logger.debug("ctor prefix '{}'", urlPrefix); this.urlPrefix = urlPrefix; } @@ -115,7 +115,8 @@ public class PolicyAgentApiImpl implements PolicyAgentApi { } try { - Type listType = new TypeToken>() {}.getType(); + Type listType = new TypeToken>() { + }.getType(); List rspParsed = gson.fromJson(rsp.getBody(), listType); PolicyInstances result = new PolicyInstances(); for (PolicyInfo p : rspParsed) { @@ -137,17 +138,17 @@ public class PolicyAgentApiImpl implements PolicyAgentApi { @Override public ResponseEntity putPolicy(String policyTypeIdString, String policyInstanceId, String json, - String ric) { + String ric) { String url = baseUrl() + "/policy?type={type}&instance={instance}&ric={ric}&service={service}"; Map uriVariables = Map.of( // - "type", policyTypeIdString, // - "instance", policyInstanceId, // - "ric", ric, // - "service", "dashboard"); + "type", policyTypeIdString, // + "instance", policyInstanceId, // + "ric", ric, // + "service", "dashboard"); try { this.restTemplate.put(url, json, uriVariables); - return new ResponseEntity<>("Policy was put successfully", HttpStatus.OK); + return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR); } @@ -159,7 +160,7 @@ public class PolicyAgentApiImpl implements PolicyAgentApi { Map uriVariables = Map.of("instance", policyInstanceId); try { this.restTemplate.delete(url, uriVariables); - return new ResponseEntity<>("Policy was deleted successfully", HttpStatus.NO_CONTENT); + return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND); } @@ -183,7 +184,8 @@ public class PolicyAgentApiImpl implements PolicyAgentApi { String rsp = this.restTemplate.getForObject(url, String.class, uriVariables); try { - Type listType = new TypeToken>() {}.getType(); + Type listType = new TypeToken>() { + }.getType(); List rspParsed = gson.fromJson(rsp, listType); Collection result = new Vector<>(rspParsed.size()); for (RicInfo ric : rspParsed) { diff --git a/policy-agent/README.md b/policy-agent/README.md index 3ddbbd51..6077a4b6 100644 --- a/policy-agent/README.md +++ b/policy-agent/README.md @@ -1,21 +1,25 @@ # O-RAN-SC NonRT RIC Dashboard Web Application -The O-RAN NonRT RIC PolicyAgent provides a REST API for management of -policices. It provides support for --Policy configuration. This includes - -One REST API towards all RICs in the network - -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc. - -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC --Supervision of clients (R-APPs) to eliminate stray policies in case of failure --Consistency monitoring of the SMO view of policies and the actual situation in the RICs --Consistency monitoring of RIC capabilities (policy types) +The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices. +It provides support for: + -Supervision of clients (R-APPs) to eliminate stray policies in case of failure + -Consistency monitoring of the SMO view of policies and the actual situation in the RICs + -Consistency monitoring of RIC capabilities (policy types) + -Policy configuration. This includes: + -One REST API towards all RICs in the network + -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), + all policies of a type etc. + -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC To Run Policy Agent in Local: -Create a symbolic link with below command, +In the folder /opt/app/policy-agent/config/, create a soft link with below command, ln -s application_configuration.json -The agent can be run stand alone in a simulated test mode. Then it -simulates RICs. +To Run Policy Agent in Local with the DMaaP polling turned on: +In the folder /opt/app/policy-agent/config/, create a soft link with below command, +ln -s application_configuration.json + +The agent can be run stand alone in a simulated test mode. Then it simulates RICs. The REST API is published on port 8081 and it is started by command: mvn -Dtest=MockPolicyAgent test diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml index 43d3aefa..f2a9411f 100644 --- a/policy-agent/pom.xml +++ b/policy-agent/pom.xml @@ -128,6 +128,11 @@ dmaap-client ${sdk.version} + + org.projectlombok + lombok + provided + io.springfox diff --git a/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java b/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java index f0826e9a..e05eb95f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java @@ -20,6 +20,7 @@ package org.oransc.policyagent; +import com.fasterxml.jackson.databind.ObjectMapper; import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.repository.Policies; @@ -61,4 +62,9 @@ class BeanFactory { return new A1ClientFactory(); } + @Bean + public ObjectMapper mapper() { + return new ObjectMapper(); + } + } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index 673fe1d0..1ed3fdb2 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -42,6 +42,7 @@ public class ApplicationConfig { private Collection observers = new Vector<>(); private Map ricConfigs = new HashMap<>(); + private Properties dmaapPublisherConfig; private Properties dmaapConsumerConfig; @Autowired @@ -72,6 +73,10 @@ public class ApplicationConfig { throw new ServiceException("Could not find ric: " + ricName); } + public Properties getDmaapPublisherConfig() { + return dmaapConsumerConfig; + } + public Properties getDmaapConsumerConfig() { return dmaapConsumerConfig; } @@ -98,7 +103,8 @@ public class ApplicationConfig { } } - public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapConsumerConfig) { + public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapPublisherConfig, + Properties dmaapConsumerConfig) { Collection notifications = new Vector<>(); synchronized (this) { Map newRicConfigs = new HashMap<>(); @@ -123,6 +129,7 @@ public class ApplicationConfig { } notifyObservers(notifications); + this.dmaapPublisherConfig = dmaapPublisherConfig; this.dmaapConsumerConfig = dmaapConsumerConfig; } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index 530ac98b..34d1d97a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -25,17 +25,16 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; - import java.net.MalformedURLException; import java.net.URL; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Vector; - import javax.validation.constraints.NotNull; - +import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; +import org.springframework.http.MediaType; public class ApplicationConfigParser { @@ -46,6 +45,7 @@ public class ApplicationConfigParser { .create(); // private Vector ricConfig; + private Properties dmaapPublisherConfig; private Properties dmaapConsumerConfig; public ApplicationConfigParser() { @@ -54,14 +54,28 @@ public class ApplicationConfigParser { public void parse(JsonObject root) throws ServiceException { JsonObject ricConfigJson = root.getAsJsonObject(CONFIG); ricConfig = parseRics(ricConfigJson); - JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes"); - dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson); + JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes"); + if (dmaapPublisherConfigJson == null) { + dmaapPublisherConfig = new Properties(); + } else { + dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson); + } + JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes"); + if (dmaapConsumerConfigJson == null) { + dmaapConsumerConfig = new Properties(); + } else { + dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson); + } } public Vector getRicConfigs() { return this.ricConfig; } + public Properties getDmaapPublisherConfig() { + return dmaapPublisherConfig; + } + public Properties getDmaapConsumerConfig() { return dmaapConsumerConfig; } @@ -86,7 +100,7 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsJsonArray(); } - private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException { + private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException { Set> topics = consumerCfg.entrySet(); if (topics.size() != 1) { throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics); @@ -106,17 +120,19 @@ public class ApplicationConfigParser { passwd = userInfo[1]; } String urlPath = url.getPath(); - DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + DmaapUrlPath path = parseDmaapUrlPath(urlPath); - dmaapProps.put("port", url.getPort()); - dmaapProps.put("server", url.getHost()); + dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events"); dmaapProps.put("topic", path.dmaapTopicName); - dmaapProps.put("consumerGroup", path.consumerGroup); - dmaapProps.put("consumerInstance", path.consumerId); - dmaapProps.put("fetchTimeout", 15000); - dmaapProps.put("fetchLimit", 1000); + dmaapProps.put("host", url.getHost()+":"+url.getPort()); + dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString()); dmaapProps.put("userName", userName); dmaapProps.put("password", passwd); + dmaapProps.put("group", path.consumerGroup); + dmaapProps.put("id", path.consumerId); + dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString()); + dmaapProps.put("timeout", 15000); + dmaapProps.put("limit", 1000); } catch (MalformedURLException e) { throw new ServiceException("Could not parse the URL", e); } @@ -128,27 +144,31 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsString(); } - private class DmaapConsumerUrlPath { + private class DmaapUrlPath { final String dmaapTopicName; final String consumerGroup; final String consumerId; - DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { this.dmaapTopicName = dmaapTopicName; this.consumerGroup = consumerGroup; this.consumerId = consumerId; } } - private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { + private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1 - if (tokens.length != 5) { + if (!(tokens.length == 3 ^ tokens.length == 5)) { throw new ServiceException("The path has incorrect syntax: " + urlPath); } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P - final String consumerGroup = tokens[3]; // users - final String consumerId = tokens[4]; // sdnc1 - return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + final String dmaapTopicName = tokens[2]; // /events/A1-P + String consumerGroup = ""; // users + String consumerId = ""; // sdnc1 + if (tokens.length == 5) { + consumerGroup = tokens[3]; + consumerId = tokens[4]; + } + return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java new file mode 100644 index 00000000..085320c9 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java @@ -0,0 +1,44 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.configuration; + +import java.util.concurrent.Executor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfiguration implements AsyncConfigurer { + + @Override + @Bean(name = "threadPoolTaskExecutor") + public Executor getAsyncExecutor() { + //Set this configuration value from common properties file + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(25); + return executor; + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java index bda5e095..cc1d711b 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java @@ -23,16 +23,23 @@ package org.oransc.policyagent.controllers; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + import java.time.Duration; import java.util.Collection; import java.util.Vector; import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.Service; import org.oransc.policyagent.repository.Services; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -43,31 +50,40 @@ import org.springframework.web.bind.annotation.RestController; public class ServiceController { private final Services services; + private final Policies policies; + private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // @Autowired - ServiceController(Services services) { + ServiceController(Services services, Policies policies) { this.services = services; + this.policies = policies; } - @GetMapping("/service") - public ResponseEntity getService( // - @RequestParam(name = "name", required = true) String name) { - try { - Service s = services.getService(name); - String res = gson.toJson(toServiceStatus(s)); - return new ResponseEntity(res, HttpStatus.OK); + @GetMapping("/services") + @ApiOperation(value = "Returns service information", response = ServiceStatus.class) + @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")}) + public ResponseEntity getServices( // + @RequestParam(name = "name", required = false) String name) { - } catch (ServiceException e) { - return new ResponseEntity(e.getMessage(), HttpStatus.NO_CONTENT); + Collection servicesStatus = new Vector<>(); + synchronized (this.services) { + for (Service s : this.services.getAll()) { + if (name == null || name.equals(s.name())) { + servicesStatus.add(toServiceStatus(s)); + } + } } + + String res = gson.toJson(servicesStatus); + return new ResponseEntity(res, HttpStatus.OK); } private ServiceStatus toServiceStatus(Service s) { return ImmutableServiceStatus.builder() // - .name(s.getName()) // + .name(s.name()) // .keepAliveInterval(s.getKeepAliveInterval().toSeconds()) // .timeSincePing(s.timeSinceLastPing().toSeconds()) // .build(); @@ -85,31 +101,39 @@ public class ServiceController { } } - private Service toService(ServiceRegistrationInfo s) { - return new Service(s.name(), Duration.ofSeconds(s.keepAliveInterval()), s.callbackUrl()); + @DeleteMapping("/services") + public ResponseEntity deleteService( // + @RequestParam(name = "name", required = true) String name) { + try { + Service service = removeService(name); + // Remove the policies from the repo and let the consistency monitoring + // do the rest. + removePolicies(service); + return new ResponseEntity("OK", HttpStatus.NO_CONTENT); + } catch (Exception e) { + return new ResponseEntity(e.getMessage(), HttpStatus.NO_CONTENT); + } } - @GetMapping("/services") - public ResponseEntity getServices() { - Collection result = new Vector<>(); + private Service removeService(String name) throws ServiceException { synchronized (this.services) { - for (Service s : this.services.getAll()) { - result.add(toServiceStatus(s)); - } + Service service = this.services.getService(name); + this.services.remove(service.name()); + return service; } - return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK); } - @PutMapping("/service/ping") - public ResponseEntity ping( // - @RequestBody String name) { - try { - Service s = services.getService(name); - s.ping(); - return new ResponseEntity("OK", HttpStatus.OK); - } catch (ServiceException e) { - return new ResponseEntity(e.getMessage(), HttpStatus.NO_CONTENT); + private void removePolicies(Service service) { + synchronized (this.policies) { + Vector policyList = new Vector<>(this.policies.getForService(service.name())); + for (Policy policy : policyList) { + this.policies.remove(policy); + } } } + private Service toService(ServiceRegistrationInfo s) { + return new Service(s.name(), Duration.ofSeconds(s.keepAliveInterval()), s.callbackUrl()); + } + } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index fd421048..889dfafa 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -21,8 +21,6 @@ package org.oransc.policyagent.dmaap; -import java.util.Properties; - /** * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface * @@ -34,7 +32,7 @@ public interface DmaapMessageConsumer { * * @param properties */ - public void init(Properties properties); + public void init(); /** * This method process the message and call the respective Controller diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 74cfe0da..2ae5e5ee 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap; import java.io.IOException; import java.util.Properties; +import javax.annotation.PostConstruct; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -40,21 +41,27 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class); private boolean alive = false; + private final ApplicationConfig applicationConfig; protected MRConsumer consumer; private MRConsumerResponse response = null; + @Autowired + private DmaapMessageHandler dmaapMessageHandler; @Autowired public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { - Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig(); - init(dmaapConsumerConfig); + this.applicationConfig = applicationConfig; } - @Scheduled(fixedRate = 1000 * 60) + @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000) @Override public void run() { - while (this.alive) { + /* + * if (!alive) { init(); } + */ + if (this.alive) { try { Iterable dmaapMsgs = fetchAllMessages(); + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); for (String msg : dmaapMsgs) { processMsg(msg); } @@ -78,24 +85,20 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { return response.getActualMessages(); } + @PostConstruct @Override - public void init(Properties properties) { - // Initialize the DMAAP with the properties - // Do we need to do any validation of properties before calling the factory? - Properties prop = new Properties(); - prop.setProperty("ServiceName", "localhost:6845/events"); - prop.setProperty("topic", "A1-P"); - prop.setProperty("host", "localhost:6845"); - prop.setProperty("contenttype", "application/json"); - prop.setProperty("username", "admin"); - prop.setProperty("password", "admin"); - prop.setProperty("group", "users"); - prop.setProperty("id", "policy-agent"); - prop.setProperty("TransportType", "HTTPNOAUTH"); - prop.setProperty("timeout", "15000"); - prop.setProperty("limit", "1000"); + public void init() { + Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); + Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); + // No need to start if there is no configuration. + if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0 + || dmaapPublisherProperties.size() == 0) { + logger.error("DMaaP properties Failed to Load"); + return; + } try { - consumer = MRClientFactory.createConsumer(prop); + logger.debug("Creating DMAAP Client"); + consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { logger.error("Exception occurred while creating Dmaap Consumer", e); @@ -104,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { @Override public void processMsg(String msg) throws Exception { - System.out.println("sysout" + msg); + logger.debug("Message Reveived from DMAAP : {}", msg); // Call the concurrent Task executor to handle the incoming request - // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP - // through REST CLIENT - // Call the Controller with the extracted payload + dmaapMessageHandler.handleDmaapMsg(msg); } @Override diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java new file mode 100644 index 00000000..bf9f06c1 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -0,0 +1,175 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.dmaap; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.oransc.policyagent.clients.AsyncRestClient; +import org.oransc.policyagent.configuration.ApplicationConfig; +import org.oransc.policyagent.controllers.PolicyController; +import org.oransc.policyagent.model.DmaapMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Component +public class DmaapMessageHandler { + + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); + + @Autowired + private ObjectMapper mapper; + @Autowired + private PolicyController policyController; + private AsyncRestClient restClient; + private ApplicationConfig applicationConfig; + private String topic = ""; + + @Autowired + public DmaapMessageHandler(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + } + + // The publish properties is corrupted. It contains the subscribe property values. + @Async("threadPoolTaskExecutor") + public void handleDmaapMsg(String msg) { + init(); + DmaapMessage dmaapMessage = null; + ResponseEntity response = null; + // Process the message + /** + * Sample Request Message from DMAAP { "type": "request", "correlationId": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", + * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", + * "payload": "{\"ric\":\"ric1\"}" } + * -------------------------------------------------------------------------------------------------------------- + * Sample Response Message to DMAAP { "type": "response", "correlation-id": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": + * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" } + * ------------------------------------------------------------------------------------------------------------- + * Sample Response Message to DMAAP { "type": "response", "correlation-id": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": + * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" } + */ + try { + dmaapMessage = mapper.readValue(msg, DmaapMessage.class); + // Post the accepted message to the DMAAP bus + logger.debug("DMAAP Message- {}", dmaapMessage); + logger.debug("Post Accepted Message to Client"); + restClient + .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), + dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY)) + .block(); // + // Call the Controller + logger.debug("Invoke the Policy Agent Controller"); + response = invokeController(dmaapMessage); + // Post the Response message to the DMAAP bus + logger.debug("DMAAP Response Message to Client- {}", response); + restClient + .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), + dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody())) + .block(); // + } catch (IOException e) { + logger.error("Exception occured during message processing", e); + } + } + + private ResponseEntity invokeController(DmaapMessage dmaapMessage) { + String formattedString = ""; + String ricName; + String instance; + logger.debug("Payload from the Message - {}", dmaapMessage.getPayload()); + try { + formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString(); + logger.debug("Removed the Escape charater in payload- {}", formattedString); + } catch (JSONException e) { + logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload()); + } + JSONObject jsonObject = new JSONObject(formattedString); + switch (dmaapMessage.getOperation()) { + case "getPolicySchemas": + ricName = (String) jsonObject.get("ricName"); + logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); + return policyController.getPolicySchemas(ricName); + case "getPolicySchema": + String policyTypeId = (String) jsonObject.get("id"); + logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); + System.out.println("policyTypeId" + policyTypeId); + return policyController.getPolicySchema(policyTypeId); + case "getPolicyTypes": + ricName = (String) jsonObject.get("ricName"); + logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); + return policyController.getPolicyTypes(ricName); + case "getPolicy": + instance = (String) jsonObject.get("instance"); + logger.debug("Received the request for getPolicy with Instance- {}", instance); + return policyController.getPolicy(instance); + case "deletePolicy": + instance = (String) jsonObject.get("instance"); + logger.debug("Received the request for deletePolicy with Instance- {}", instance); + return null;// policyController.deletePolicy(deleteInstance); + case "putPolicy": + String type = (String) jsonObject.get("type"); + String putPolicyInstance = (String) jsonObject.get("instance"); + String putPolicyRic = (String) jsonObject.get("ric"); + String service = (String) jsonObject.get("service"); + String jsonBody = (String) jsonObject.get("jsonBody"); + return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody); + case "getPolicies": + String getPolicyType = (String) jsonObject.get("type"); + String getPolicyRic = (String) jsonObject.get("ric"); + String getPolicyService = (String) jsonObject.get("service"); + return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService); + default: + break; + } + return null; + } + + private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status, + String message) { + System.out.println("buildResponse "); + return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "") + .put("originatorId", originatorId).put("requestId", requestId).put("status", status) + .put("message", message).toString(); + } + + // @PostConstruct + // The application properties value is always NULL for the first time + // Need to fix this + public void init() { + logger.debug("Reading DMAAP Publisher bus details from Application Config"); + Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); + String host = (String) dmaapPublisherConfig.get("ServiceName"); + topic = dmaapPublisherConfig.getProperty("topic"); + logger.debug("Read the topic & Service Name - {} , {}", host, topic); + this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config + + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java new file mode 100644 index 00000000..e56f4b4a --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java @@ -0,0 +1,47 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.model; + +import java.sql.Timestamp; +import javax.validation.constraints.NotNull; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class DmaapMessage { + + @NotNull + private String type; + @NotNull + private String correlationId; + @NotNull + private String target; + @NotNull + private Timestamp timestamp; + private String apiVersion; + @NotNull + private String originatorId; + private String requestId; + @NotNull + private String operation; + private String payload; +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java index 6458dbba..18a85a91 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java @@ -36,7 +36,7 @@ public class Service { ping(); } - public synchronized String getName() { + public synchronized String name() { return this.name; } @@ -44,7 +44,7 @@ public class Service { return this.keepAliveInterval; } - public synchronized void ping() { + private synchronized void ping() { this.lastPing = Instant.now(); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java index 2e1d8da7..01d6a7a8 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java @@ -48,12 +48,20 @@ public class Services { } public synchronized void put(Service service) { - logger.debug("Put service: " + service.getName()); - services.put(service.getName(), service); + logger.debug("Put service: " + service.name()); + services.put(service.name(), service); } public synchronized Iterable getAll() { return services.values(); } + public synchronized void remove(String name) { + services.remove(name); + } + + public synchronized int size() { + return services.size(); + } + } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index bc43edab..1ab5fc9c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -125,7 +125,8 @@ public class RefreshConfigTask { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); - this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig()); + this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(), + parser.getDmaapConsumerConfig()); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -152,7 +153,8 @@ public class RefreshConfigTask { } ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig()); + appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(), + appParser.getDmaapConsumerConfig()); logger.info("Local configuration file loaded: {}", filepath); } catch (JsonSyntaxException | ServiceException | IOException e) { logger.trace("Local configuration file not loaded: {}", filepath, e); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java index d4b32e02..c10e0047 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java @@ -72,7 +72,7 @@ public class ServiceSupervision { synchronized (services) { return Flux.fromIterable(services.getAll()) // .filter(service -> service.isExpired()) // - .doOnNext(service -> logger.info("Service is expired:" + service.getName())) // + .doOnNext(service -> logger.info("Service is expired:" + service.name())) // .flatMap(service -> getAllPolicies(service)) // .doOnNext(policy -> this.policies.remove(policy)) // .flatMap(policy -> deletePolicyInRic(policy)); @@ -81,7 +81,7 @@ public class ServiceSupervision { private Flux getAllPolicies(Service service) { synchronized (policies) { - return Flux.fromIterable(policies.getForService(service.getName())); + return Flux.fromIterable(policies.getForService(service.name())); } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 4270ded6..5b1f3ca4 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -26,8 +26,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import java.util.ArrayList; import java.util.List; import java.util.Vector; @@ -49,6 +52,7 @@ import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.oransc.policyagent.tasks.RepositorySupervision; import org.oransc.policyagent.utils.MockA1Client; import org.oransc.policyagent.utils.MockA1ClientFactory; @@ -85,6 +89,9 @@ public class ApplicationTest { @Autowired RepositorySupervision supervision; + @Autowired + Services services; + private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // @@ -284,14 +291,6 @@ public class ApplicationTest { assertThat(policies.size()).isEqualTo(0); } - private static List parseList(String json, Class clazz) { - if (null == json) { - return null; - } - return gson.fromJson(json, new TypeToken() {}.getType()); - - } - @Test public void testGetPolicySchemas() throws Exception { reset(); @@ -305,13 +304,13 @@ public class ApplicationTest { assertThat(rsp).contains("type2"); assertThat(rsp).contains("title"); - List info = parseList(rsp, String.class); + List info = parseSchemas(rsp); assertEquals(2, info.size()); url = baseUrl() + "/policy_schemas?ric=ric1"; rsp = this.restTemplate.getForObject(url, String.class); assertThat(rsp).contains("type1"); - info = parseList(rsp, String.class); + info = parseSchemas(rsp); assertEquals(1, info.size()); } @@ -393,21 +392,48 @@ public class ApplicationTest { @Test public void testPutAndGetService() throws Exception { + // PUT putService("name"); - String url = baseUrl() + "/service?name=name"; + // GET + String url = baseUrl() + "/services?name=name"; String rsp = this.restTemplate.getForObject(url, String.class); - ServiceStatus status = gson.fromJson(rsp, ImmutableServiceStatus.class); + List info = parseList(rsp, ImmutableServiceStatus.class); + assertThat(info.size() == 1); + ServiceStatus status = info.iterator().next(); assertThat(status.keepAliveInterval() == 1); assertThat(status.name().equals("name")); + // GET (all) url = baseUrl() + "/services"; rsp = this.restTemplate.getForObject(url, String.class); assertThat(rsp.contains("name")); System.out.println(rsp); - url = baseUrl() + "/service/ping"; - this.restTemplate.put(url, "name"); + // DELETE + assertThat(services.size() == 1); + url = baseUrl() + "/services?name=name"; + this.restTemplate.delete(url); + assertThat(services.size() == 0); + } + + private static List parseList(String jsonString, Class clazz) { + List result = new ArrayList<>(); + JsonArray jsonArr = new JsonParser().parse(jsonString).getAsJsonArray(); + for (JsonElement jsonElement : jsonArr) { + T o = gson.fromJson(jsonElement.toString(), clazz); + result.add(o); + } + return result; + } + + private static List parseSchemas(String jsonString) { + JsonArray arrayOfSchema = new JsonParser().parse(jsonString).getAsJsonArray(); + List result = new ArrayList<>(); + for (JsonElement schemaObject : arrayOfSchema) { + result.add(schemaObject.toString()); + } + return result; } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index bbbc06d2..b4c41295 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -121,7 +121,6 @@ public class MockPolicyAgent { } } } - } @LocalServerPort diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json index c63b7104..446c0611 100644 --- a/policy-agent/src/test/resources/test_application_configuration.json +++ b/policy-agent/src/test/resources/test_application_configuration.json @@ -19,13 +19,5 @@ ] } ] - }, - "streams_subscribes": { - "dmaap_subscriber": { - "dmaap_info": { - "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1" - }, - "type": "message_router" - } } } \ No newline at end of file diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json new file mode 100644 index 00000000..c9453603 --- /dev/null +++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json @@ -0,0 +1,39 @@ +{ + "config": { + "//description": "Application configuration", + "ric": [ + { + "name": "ric1", + "baseUrl": "http://localhost:8080/", + "managedElementIds": [ + "kista_1", + "kista_2" + ] + }, + { + "name": "ric2", + "baseUrl": "http://localhost:8081/", + "managedElementIds": [ + "kista_3", + "kista_4" + ] + } + ] + }, + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" + } + } + }, + "streams_subscribes": { + "dmaap_subscriber": { + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent" + }, + "type": "message_router" + } + } +} \ No newline at end of file