RestTemplate restTemplate = new RestTemplate();
private static com.google.gson.Gson gson = new GsonBuilder() //
- .serializeNulls() //
- .create(); //
+ .serializeNulls() //
+ .create(); //
private final String urlPrefix;
@Autowired
public PolicyAgentApiImpl(
- @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
+ @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
logger.debug("ctor prefix '{}'", urlPrefix);
this.urlPrefix = urlPrefix;
}
}
try {
- Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {}.getType();
+ Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {
+ }.getType();
List<PolicyInfo> rspParsed = gson.fromJson(rsp.getBody(), listType);
PolicyInstances result = new PolicyInstances();
for (PolicyInfo p : rspParsed) {
@Override
public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
- String ric) {
+ String ric) {
String url = baseUrl() + "/policy?type={type}&instance={instance}&ric={ric}&service={service}";
Map<String, ?> uriVariables = Map.of( //
- "type", policyTypeIdString, //
- "instance", policyInstanceId, //
- "ric", ric, //
- "service", "dashboard");
+ "type", policyTypeIdString, //
+ "instance", policyInstanceId, //
+ "ric", ric, //
+ "service", "dashboard");
try {
this.restTemplate.put(url, json, uriVariables);
- return new ResponseEntity<>("Policy was put successfully", HttpStatus.OK);
+ return new ResponseEntity<>(HttpStatus.OK);
} catch (Exception e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
}
Map<String, ?> uriVariables = Map.of("instance", policyInstanceId);
try {
this.restTemplate.delete(url, uriVariables);
- return new ResponseEntity<>("Policy was deleted successfully", HttpStatus.NO_CONTENT);
+ return new ResponseEntity<>(HttpStatus.OK);
} catch (Exception e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
}
String rsp = this.restTemplate.getForObject(url, String.class, uriVariables);
try {
- Type listType = new TypeToken<List<ImmutableRicInfo>>() {}.getType();
+ Type listType = new TypeToken<List<ImmutableRicInfo>>() {
+ }.getType();
List<RicInfo> rspParsed = gson.fromJson(rsp, listType);
Collection<String> result = new Vector<>(rspParsed.size());
for (RicInfo ric : rspParsed) {
# O-RAN-SC NonRT RIC Dashboard Web Application
-The O-RAN NonRT RIC PolicyAgent provides a REST API for management of
-policices. It provides support for
--Policy configuration. This includes
- -One REST API towards all RICs in the network
- -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
- -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
--Supervision of clients (R-APPs) to eliminate stray policies in case of failure
--Consistency monitoring of the SMO view of policies and the actual situation in the RICs
--Consistency monitoring of RIC capabilities (policy types)
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices.
+It provides support for:
+ -Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+ -Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+ -Consistency monitoring of RIC capabilities (policy types)
+ -Policy configuration. This includes:
+ -One REST API towards all RICs in the network
+ -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP),
+ all policies of a type etc.
+ -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
To Run Policy Agent in Local:
-Create a symbolic link with below command,
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
ln -s <path to test_application_configuration.json> application_configuration.json
-The agent can be run stand alone in a simulated test mode. Then it
-simulates RICs.
+To Run Policy Agent in Local with the DMaaP polling turned on:
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
+ln -s <path to test_application_configuration_with_dmaap_config.json> application_configuration.json
+
+The agent can be run stand alone in a simulated test mode. Then it simulates RICs.
The REST API is published on port 8081 and it is started by command:
mvn -Dtest=MockPolicyAgent test
<artifactId>dmaap-client</artifactId>
<version>${sdk.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!--REQUIRED TO GENERATE DOCUMENTATION -->
<dependency>
<groupId>io.springfox</groupId>
package org.oransc.policyagent;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.repository.Policies;
return new A1ClientFactory();
}
+ @Bean
+ public ObjectMapper mapper() {
+ return new ObjectMapper();
+ }
+
}
private Collection<Observer> observers = new Vector<>();
private Map<String, RicConfig> ricConfigs = new HashMap<>();
+ private Properties dmaapPublisherConfig;
private Properties dmaapConsumerConfig;
@Autowired
throw new ServiceException("Could not find ric: " + ricName);
}
+ public Properties getDmaapPublisherConfig() {
+ return dmaapConsumerConfig;
+ }
+
public Properties getDmaapConsumerConfig() {
return dmaapConsumerConfig;
}
}
}
- public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapConsumerConfig) {
+ public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapPublisherConfig,
+ Properties dmaapConsumerConfig) {
Collection<Notification> notifications = new Vector<>();
synchronized (this) {
Map<String, RicConfig> newRicConfigs = new HashMap<>();
}
notifyObservers(notifications);
+ this.dmaapPublisherConfig = dmaapPublisherConfig;
this.dmaapConsumerConfig = dmaapConsumerConfig;
}
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
-
import javax.validation.constraints.NotNull;
-
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.oransc.policyagent.exceptions.ServiceException;
+import org.springframework.http.MediaType;
public class ApplicationConfigParser {
.create(); //
private Vector<RicConfig> ricConfig;
+ private Properties dmaapPublisherConfig;
private Properties dmaapConsumerConfig;
public ApplicationConfigParser() {
public void parse(JsonObject root) throws ServiceException {
JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
ricConfig = parseRics(ricConfigJson);
- JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
- dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
+ JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes");
+ if (dmaapPublisherConfigJson == null) {
+ dmaapPublisherConfig = new Properties();
+ } else {
+ dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson);
+ }
+ JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes");
+ if (dmaapConsumerConfigJson == null) {
+ dmaapConsumerConfig = new Properties();
+ } else {
+ dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson);
+ }
}
public Vector<RicConfig> getRicConfigs() {
return this.ricConfig;
}
+ public Properties getDmaapPublisherConfig() {
+ return dmaapPublisherConfig;
+ }
+
public Properties getDmaapConsumerConfig() {
return dmaapConsumerConfig;
}
return get(obj, memberName).getAsJsonArray();
}
- private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+ private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException {
Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
if (topics.size() != 1) {
throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
passwd = userInfo[1];
}
String urlPath = url.getPath();
- DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+ DmaapUrlPath path = parseDmaapUrlPath(urlPath);
- dmaapProps.put("port", url.getPort());
- dmaapProps.put("server", url.getHost());
+ dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events");
dmaapProps.put("topic", path.dmaapTopicName);
- dmaapProps.put("consumerGroup", path.consumerGroup);
- dmaapProps.put("consumerInstance", path.consumerId);
- dmaapProps.put("fetchTimeout", 15000);
- dmaapProps.put("fetchLimit", 1000);
+ dmaapProps.put("host", url.getHost()+":"+url.getPort());
+ dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
dmaapProps.put("userName", userName);
dmaapProps.put("password", passwd);
+ dmaapProps.put("group", path.consumerGroup);
+ dmaapProps.put("id", path.consumerId);
+ dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
+ dmaapProps.put("timeout", 15000);
+ dmaapProps.put("limit", 1000);
} catch (MalformedURLException e) {
throw new ServiceException("Could not parse the URL", e);
}
return get(obj, memberName).getAsString();
}
- private class DmaapConsumerUrlPath {
+ private class DmaapUrlPath {
final String dmaapTopicName;
final String consumerGroup;
final String consumerId;
- DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
this.dmaapTopicName = dmaapTopicName;
this.consumerGroup = consumerGroup;
this.consumerId = consumerId;
}
}
- private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+ private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
- if (tokens.length != 5) {
+ if (!(tokens.length == 3 ^ tokens.length == 5)) {
throw new ServiceException("The path has incorrect syntax: " + urlPath);
}
- final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
- final String consumerGroup = tokens[3]; // users
- final String consumerId = tokens[4]; // sdnc1
- return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ final String dmaapTopicName = tokens[2]; // /events/A1-P
+ String consumerGroup = ""; // users
+ String consumerId = ""; // sdnc1
+ if (tokens.length == 5) {
+ consumerGroup = tokens[3];
+ consumerId = tokens[4];
+ }
+ return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
}
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.configuration;
+
+import java.util.concurrent.Executor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+@EnableAsync
+public class AsyncConfiguration implements AsyncConfigurer {
+
+ @Override
+ @Bean(name = "threadPoolTaskExecutor")
+ public Executor getAsyncExecutor() {
+ //Set this configuration value from common properties file
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(5);
+ executor.setMaxPoolSize(10);
+ executor.setQueueCapacity(25);
+ return executor;
+ }
+}
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
import java.time.Duration;
import java.util.Collection;
import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.Service;
import org.oransc.policyagent.repository.Services;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
public class ServiceController {
private final Services services;
+ private final Policies policies;
+
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
@Autowired
- ServiceController(Services services) {
+ ServiceController(Services services, Policies policies) {
this.services = services;
+ this.policies = policies;
}
- @GetMapping("/service")
- public ResponseEntity<String> getService( //
- @RequestParam(name = "name", required = true) String name) {
- try {
- Service s = services.getService(name);
- String res = gson.toJson(toServiceStatus(s));
- return new ResponseEntity<String>(res, HttpStatus.OK);
+ @GetMapping("/services")
+ @ApiOperation(value = "Returns service information", response = ServiceStatus.class)
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
+ public ResponseEntity<String> getServices( //
+ @RequestParam(name = "name", required = false) String name) {
- } catch (ServiceException e) {
- return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+ Collection<ServiceStatus> servicesStatus = new Vector<>();
+ synchronized (this.services) {
+ for (Service s : this.services.getAll()) {
+ if (name == null || name.equals(s.name())) {
+ servicesStatus.add(toServiceStatus(s));
+ }
+ }
}
+
+ String res = gson.toJson(servicesStatus);
+ return new ResponseEntity<String>(res, HttpStatus.OK);
}
private ServiceStatus toServiceStatus(Service s) {
return ImmutableServiceStatus.builder() //
- .name(s.getName()) //
+ .name(s.name()) //
.keepAliveInterval(s.getKeepAliveInterval().toSeconds()) //
.timeSincePing(s.timeSinceLastPing().toSeconds()) //
.build();
}
}
- private Service toService(ServiceRegistrationInfo s) {
- return new Service(s.name(), Duration.ofSeconds(s.keepAliveInterval()), s.callbackUrl());
+ @DeleteMapping("/services")
+ public ResponseEntity<String> deleteService( //
+ @RequestParam(name = "name", required = true) String name) {
+ try {
+ Service service = removeService(name);
+ // Remove the policies from the repo and let the consistency monitoring
+ // do the rest.
+ removePolicies(service);
+ return new ResponseEntity<String>("OK", HttpStatus.NO_CONTENT);
+ } catch (Exception e) {
+ return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+ }
}
- @GetMapping("/services")
- public ResponseEntity<?> getServices() {
- Collection<ServiceStatus> result = new Vector<>();
+ private Service removeService(String name) throws ServiceException {
synchronized (this.services) {
- for (Service s : this.services.getAll()) {
- result.add(toServiceStatus(s));
- }
+ Service service = this.services.getService(name);
+ this.services.remove(service.name());
+ return service;
}
- return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
}
- @PutMapping("/service/ping")
- public ResponseEntity<String> ping( //
- @RequestBody String name) {
- try {
- Service s = services.getService(name);
- s.ping();
- return new ResponseEntity<String>("OK", HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+ private void removePolicies(Service service) {
+ synchronized (this.policies) {
+ Vector<Policy> policyList = new Vector<>(this.policies.getForService(service.name()));
+ for (Policy policy : policyList) {
+ this.policies.remove(policy);
+ }
}
}
+ private Service toService(ServiceRegistrationInfo s) {
+ return new Service(s.name(), Duration.ofSeconds(s.keepAliveInterval()), s.callbackUrl());
+ }
+
}
package org.oransc.policyagent.dmaap;
-import java.util.Properties;
-
/**
* The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
*
*
* @param properties
*/
- public void init(Properties properties);
+ public void init();
/**
* This method process the message and call the respective Controller
import java.io.IOException;
import java.util.Properties;
+import javax.annotation.PostConstruct;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
private boolean alive = false;
+ private final ApplicationConfig applicationConfig;
protected MRConsumer consumer;
private MRConsumerResponse response = null;
+ @Autowired
+ private DmaapMessageHandler dmaapMessageHandler;
@Autowired
public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
- Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
- init(dmaapConsumerConfig);
+ this.applicationConfig = applicationConfig;
}
- @Scheduled(fixedRate = 1000 * 60)
+ @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
@Override
public void run() {
- while (this.alive) {
+ /*
+ * if (!alive) { init(); }
+ */
+ if (this.alive) {
try {
Iterable<String> dmaapMsgs = fetchAllMessages();
+ logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
for (String msg : dmaapMsgs) {
processMsg(msg);
}
return response.getActualMessages();
}
+ @PostConstruct
@Override
- public void init(Properties properties) {
- // Initialize the DMAAP with the properties
- // Do we need to do any validation of properties before calling the factory?
- Properties prop = new Properties();
- prop.setProperty("ServiceName", "localhost:6845/events");
- prop.setProperty("topic", "A1-P");
- prop.setProperty("host", "localhost:6845");
- prop.setProperty("contenttype", "application/json");
- prop.setProperty("username", "admin");
- prop.setProperty("password", "admin");
- prop.setProperty("group", "users");
- prop.setProperty("id", "policy-agent");
- prop.setProperty("TransportType", "HTTPNOAUTH");
- prop.setProperty("timeout", "15000");
- prop.setProperty("limit", "1000");
+ public void init() {
+ Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+ Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+ // No need to start if there is no configuration.
+ if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+ || dmaapPublisherProperties.size() == 0) {
+ logger.error("DMaaP properties Failed to Load");
+ return;
+ }
try {
- consumer = MRClientFactory.createConsumer(prop);
+ logger.debug("Creating DMAAP Client");
+ consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
this.alive = true;
} catch (IOException e) {
logger.error("Exception occurred while creating Dmaap Consumer", e);
@Override
public void processMsg(String msg) throws Exception {
- System.out.println("sysout" + msg);
+ logger.debug("Message Reveived from DMAAP : {}", msg);
// Call the concurrent Task executor to handle the incoming request
- // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
- // through REST CLIENT
- // Call the Controller with the extracted payload
+ dmaapMessageHandler.handleDmaapMsg(msg);
}
@Override
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.dmaap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.controllers.PolicyController;
+import org.oransc.policyagent.model.DmaapMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+
+ @Autowired
+ private ObjectMapper mapper;
+ @Autowired
+ private PolicyController policyController;
+ private AsyncRestClient restClient;
+ private ApplicationConfig applicationConfig;
+ private String topic = "";
+
+ @Autowired
+ public DmaapMessageHandler(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+ }
+
+ // The publish properties is corrupted. It contains the subscribe property values.
+ @Async("threadPoolTaskExecutor")
+ public void handleDmaapMsg(String msg) {
+ init();
+ DmaapMessage dmaapMessage = null;
+ ResponseEntity<String> response = null;
+ // Process the message
+ /**
+ * Sample Request Message from DMAAP { "type": "request", "correlationId":
+ * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
+ * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
+ * "payload": "{\"ric\":\"ric1\"}" }
+ * --------------------------------------------------------------------------------------------------------------
+ * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+ * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+ * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+ * -------------------------------------------------------------------------------------------------------------
+ * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+ * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+ * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
+ */
+ try {
+ dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
+ // Post the accepted message to the DMAAP bus
+ logger.debug("DMAAP Message- {}", dmaapMessage);
+ logger.debug("Post Accepted Message to Client");
+ restClient
+ .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+ dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
+ .block(); //
+ // Call the Controller
+ logger.debug("Invoke the Policy Agent Controller");
+ response = invokeController(dmaapMessage);
+ // Post the Response message to the DMAAP bus
+ logger.debug("DMAAP Response Message to Client- {}", response);
+ restClient
+ .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+ dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
+ .block(); //
+ } catch (IOException e) {
+ logger.error("Exception occured during message processing", e);
+ }
+ }
+
+ private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+ String formattedString = "";
+ String ricName;
+ String instance;
+ logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+ try {
+ formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+ logger.debug("Removed the Escape charater in payload- {}", formattedString);
+ } catch (JSONException e) {
+ logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+ }
+ JSONObject jsonObject = new JSONObject(formattedString);
+ switch (dmaapMessage.getOperation()) {
+ case "getPolicySchemas":
+ ricName = (String) jsonObject.get("ricName");
+ logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
+ return policyController.getPolicySchemas(ricName);
+ case "getPolicySchema":
+ String policyTypeId = (String) jsonObject.get("id");
+ logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
+ System.out.println("policyTypeId" + policyTypeId);
+ return policyController.getPolicySchema(policyTypeId);
+ case "getPolicyTypes":
+ ricName = (String) jsonObject.get("ricName");
+ logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
+ return policyController.getPolicyTypes(ricName);
+ case "getPolicy":
+ instance = (String) jsonObject.get("instance");
+ logger.debug("Received the request for getPolicy with Instance- {}", instance);
+ return policyController.getPolicy(instance);
+ case "deletePolicy":
+ instance = (String) jsonObject.get("instance");
+ logger.debug("Received the request for deletePolicy with Instance- {}", instance);
+ return null;// policyController.deletePolicy(deleteInstance);
+ case "putPolicy":
+ String type = (String) jsonObject.get("type");
+ String putPolicyInstance = (String) jsonObject.get("instance");
+ String putPolicyRic = (String) jsonObject.get("ric");
+ String service = (String) jsonObject.get("service");
+ String jsonBody = (String) jsonObject.get("jsonBody");
+ return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+ case "getPolicies":
+ String getPolicyType = (String) jsonObject.get("type");
+ String getPolicyRic = (String) jsonObject.get("ric");
+ String getPolicyService = (String) jsonObject.get("service");
+ return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
+ String message) {
+ System.out.println("buildResponse ");
+ return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
+ .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
+ .put("message", message).toString();
+ }
+
+ // @PostConstruct
+ // The application properties value is always NULL for the first time
+ // Need to fix this
+ public void init() {
+ logger.debug("Reading DMAAP Publisher bus details from Application Config");
+ Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
+ String host = (String) dmaapPublisherConfig.get("ServiceName");
+ topic = dmaapPublisherConfig.getProperty("topic");
+ logger.debug("Read the topic & Service Name - {} , {}", host, topic);
+ this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+
+ }
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.model;
+
+import java.sql.Timestamp;
+import javax.validation.constraints.NotNull;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class DmaapMessage {
+
+ @NotNull
+ private String type;
+ @NotNull
+ private String correlationId;
+ @NotNull
+ private String target;
+ @NotNull
+ private Timestamp timestamp;
+ private String apiVersion;
+ @NotNull
+ private String originatorId;
+ private String requestId;
+ @NotNull
+ private String operation;
+ private String payload;
+}
ping();
}
- public synchronized String getName() {
+ public synchronized String name() {
return this.name;
}
return this.keepAliveInterval;
}
- public synchronized void ping() {
+ private synchronized void ping() {
this.lastPing = Instant.now();
}
}
public synchronized void put(Service service) {
- logger.debug("Put service: " + service.getName());
- services.put(service.getName(), service);
+ logger.debug("Put service: " + service.name());
+ services.put(service.name(), service);
}
public synchronized Iterable<Service> getAll() {
return services.values();
}
+ public synchronized void remove(String name) {
+ services.remove(name);
+ }
+
+ public synchronized int size() {
+ return services.size();
+ }
+
}
try {
ApplicationConfigParser parser = new ApplicationConfigParser();
parser.parse(jsonObject);
- this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig());
+ this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
+ parser.getDmaapConsumerConfig());
} catch (ServiceException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
}
ApplicationConfigParser appParser = new ApplicationConfigParser();
appParser.parse(rootObject);
- appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig());
+ appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
+ appParser.getDmaapConsumerConfig());
logger.info("Local configuration file loaded: {}", filepath);
} catch (JsonSyntaxException | ServiceException | IOException e) {
logger.trace("Local configuration file not loaded: {}", filepath, e);
synchronized (services) {
return Flux.fromIterable(services.getAll()) //
.filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
+ .doOnNext(service -> logger.info("Service is expired:" + service.name())) //
.flatMap(service -> getAllPolicies(service)) //
.doOnNext(policy -> this.policies.remove(policy)) //
.flatMap(policy -> deletePolicyInRic(policy));
private Flux<Policy> getAllPolicies(Service service) {
synchronized (policies) {
- return Flux.fromIterable(policies.getForService(service.getName()));
+ return Flux.fromIterable(policies.getForService(service.name()));
}
}
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
import org.oransc.policyagent.tasks.RepositorySupervision;
import org.oransc.policyagent.utils.MockA1Client;
import org.oransc.policyagent.utils.MockA1ClientFactory;
@Autowired
RepositorySupervision supervision;
+ @Autowired
+ Services services;
+
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
assertThat(policies.size()).isEqualTo(0);
}
- private static <T> List<T> parseList(String json, Class<T> clazz) {
- if (null == json) {
- return null;
- }
- return gson.fromJson(json, new TypeToken<T>() {}.getType());
-
- }
-
@Test
public void testGetPolicySchemas() throws Exception {
reset();
assertThat(rsp).contains("type2");
assertThat(rsp).contains("title");
- List<String> info = parseList(rsp, String.class);
+ List<String> info = parseSchemas(rsp);
assertEquals(2, info.size());
url = baseUrl() + "/policy_schemas?ric=ric1";
rsp = this.restTemplate.getForObject(url, String.class);
assertThat(rsp).contains("type1");
- info = parseList(rsp, String.class);
+ info = parseSchemas(rsp);
assertEquals(1, info.size());
}
@Test
public void testPutAndGetService() throws Exception {
+ // PUT
putService("name");
- String url = baseUrl() + "/service?name=name";
+ // GET
+ String url = baseUrl() + "/services?name=name";
String rsp = this.restTemplate.getForObject(url, String.class);
- ServiceStatus status = gson.fromJson(rsp, ImmutableServiceStatus.class);
+ List<ImmutableServiceStatus> info = parseList(rsp, ImmutableServiceStatus.class);
+ assertThat(info.size() == 1);
+ ServiceStatus status = info.iterator().next();
assertThat(status.keepAliveInterval() == 1);
assertThat(status.name().equals("name"));
+ // GET (all)
url = baseUrl() + "/services";
rsp = this.restTemplate.getForObject(url, String.class);
assertThat(rsp.contains("name"));
System.out.println(rsp);
- url = baseUrl() + "/service/ping";
- this.restTemplate.put(url, "name");
+ // DELETE
+ assertThat(services.size() == 1);
+ url = baseUrl() + "/services?name=name";
+ this.restTemplate.delete(url);
+ assertThat(services.size() == 0);
+ }
+
+ private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
+ List<T> result = new ArrayList<>();
+ JsonArray jsonArr = new JsonParser().parse(jsonString).getAsJsonArray();
+ for (JsonElement jsonElement : jsonArr) {
+ T o = gson.fromJson(jsonElement.toString(), clazz);
+ result.add(o);
+ }
+ return result;
+ }
+
+ private static List<String> parseSchemas(String jsonString) {
+ JsonArray arrayOfSchema = new JsonParser().parse(jsonString).getAsJsonArray();
+ List<String> result = new ArrayList<>();
+ for (JsonElement schemaObject : arrayOfSchema) {
+ result.add(schemaObject.toString());
+ }
+ return result;
}
}
}
}
}
-
}
@LocalServerPort
]
}
]
- },
- "streams_subscribes": {
- "dmaap_subscriber": {
- "dmaap_info": {
- "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
- },
- "type": "message_router"
- }
}
}
\ No newline at end of file
--- /dev/null
+{
+ "config": {
+ "//description": "Application configuration",
+ "ric": [
+ {
+ "name": "ric1",
+ "baseUrl": "http://localhost:8080/",
+ "managedElementIds": [
+ "kista_1",
+ "kista_2"
+ ]
+ },
+ {
+ "name": "ric2",
+ "baseUrl": "http://localhost:8081/",
+ "managedElementIds": [
+ "kista_3",
+ "kista_4"
+ ]
+ }
+ ]
+ },
+ "streams_publishes": {
+ "dmaap_publisher": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+ }
+ }
+ },
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+ },
+ "type": "message_router"
+ }
+ }
+}
\ No newline at end of file