Merge "Adapted to latest STD A1 API spec"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Tue, 28 Jan 2020 08:46:37 +0000 (08:46 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Tue, 28 Jan 2020 08:46:37 +0000 (08:46 +0000)
20 files changed:
dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApiImpl.java
policy-agent/README.md
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/resources/test_application_configuration.json
policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json [new file with mode: 0644]

index c661e66..19865e1 100644 (file)
@@ -55,14 +55,14 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
     RestTemplate restTemplate = new RestTemplate();
 
     private static com.google.gson.Gson gson = new GsonBuilder() //
-        .serializeNulls() //
-        .create(); //
+            .serializeNulls() //
+            .create(); //
 
     private final String urlPrefix;
 
     @Autowired
     public PolicyAgentApiImpl(
-        @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
+            @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
         logger.debug("ctor prefix '{}'", urlPrefix);
         this.urlPrefix = urlPrefix;
     }
@@ -115,7 +115,8 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
         }
 
         try {
-            Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {}.getType();
+            Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {
+            }.getType();
             List<PolicyInfo> rspParsed = gson.fromJson(rsp.getBody(), listType);
             PolicyInstances result = new PolicyInstances();
             for (PolicyInfo p : rspParsed) {
@@ -137,17 +138,17 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
 
     @Override
     public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
-        String ric) {
+            String ric) {
         String url = baseUrl() + "/policy?type={type}&instance={instance}&ric={ric}&service={service}";
         Map<String, ?> uriVariables = Map.of( //
-            "type", policyTypeIdString, //
-            "instance", policyInstanceId, //
-            "ric", ric, //
-            "service", "dashboard");
+                "type", policyTypeIdString, //
+                "instance", policyInstanceId, //
+                "ric", ric, //
+                "service", "dashboard");
 
         try {
             this.restTemplate.put(url, json, uriVariables);
-            return new ResponseEntity<>("Policy was put successfully", HttpStatus.OK);
+            return new ResponseEntity<>(HttpStatus.OK);
         } catch (Exception e) {
             return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
         }
@@ -159,7 +160,7 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
         Map<String, ?> uriVariables = Map.of("instance", policyInstanceId);
         try {
             this.restTemplate.delete(url, uriVariables);
-            return new ResponseEntity<>("Policy was deleted successfully", HttpStatus.NO_CONTENT);
+            return new ResponseEntity<>(HttpStatus.OK);
         } catch (Exception e) {
             return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
         }
@@ -183,7 +184,8 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
         String rsp = this.restTemplate.getForObject(url, String.class, uriVariables);
 
         try {
-            Type listType = new TypeToken<List<ImmutableRicInfo>>() {}.getType();
+            Type listType = new TypeToken<List<ImmutableRicInfo>>() {
+            }.getType();
             List<RicInfo> rspParsed = gson.fromJson(rsp, listType);
             Collection<String> result = new Vector<>(rspParsed.size());
             for (RicInfo ric : rspParsed) {
index 3ddbbd5..6077a4b 100644 (file)
@@ -1,21 +1,25 @@
 # O-RAN-SC NonRT RIC Dashboard Web Application
 
-The O-RAN NonRT RIC PolicyAgent provides a REST API for management of 
-policices. It provides support for 
--Policy configuration. This includes
- -One REST API towards all RICs in the network
- -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
- -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC 
--Supervision of clients (R-APPs) to eliminate stray policies in case of failure
--Consistency monitoring of the SMO view of policies and the actual situation in the RICs
--Consistency monitoring of RIC capabilities (policy types)
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of policices.
+It provides support for:
+ -Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+ -Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+ -Consistency monitoring of RIC capabilities (policy types)
+ -Policy configuration. This includes:
+  -One REST API towards all RICs in the network
+  -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP),
+   all policies of a type etc.
+  -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
 
 To Run Policy Agent in Local:
-Create a symbolic link with below command,
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
 ln -s <path to test_application_configuration.json> application_configuration.json
 
-The agent can be run stand alone in a simulated test mode. Then it 
-simulates RICs. 
+To Run Policy Agent in Local with the DMaaP polling turned on:
+In the folder /opt/app/policy-agent/config/, create a soft link with below command,
+ln -s <path to test_application_configuration_with_dmaap_config.json> application_configuration.json
+
+The agent can be run stand alone in a simulated test mode. Then it simulates RICs.
 The REST API is published on port 8081 and it is started by command:
 mvn -Dtest=MockPolicyAgent test
 
index 43d3aef..f2a9411 100644 (file)
             <artifactId>dmaap-client</artifactId>
                        <version>${sdk.version}</version>
                </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+         <scope>provided</scope>
+        </dependency>
                <!--REQUIRED TO GENERATE DOCUMENTATION -->
                <dependency>
                        <groupId>io.springfox</groupId>
index f0826e9..e05eb95 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.Policies;
@@ -61,4 +62,9 @@ class BeanFactory {
         return new A1ClientFactory();
     }
 
+    @Bean
+    public ObjectMapper mapper() {
+      return new ObjectMapper();
+    }
+
 }
index 673fe1d..1ed3fdb 100644 (file)
@@ -42,6 +42,7 @@ public class ApplicationConfig {
 
     private Collection<Observer> observers = new Vector<>();
     private Map<String, RicConfig> ricConfigs = new HashMap<>();
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     @Autowired
@@ -72,6 +73,10 @@ public class ApplicationConfig {
         throw new ServiceException("Could not find ric: " + ricName);
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapConsumerConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -98,7 +103,8 @@ public class ApplicationConfig {
         }
     }
 
-    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapConsumerConfig) {
+    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapPublisherConfig,
+        Properties dmaapConsumerConfig) {
         Collection<Notification> notifications = new Vector<>();
         synchronized (this) {
             Map<String, RicConfig> newRicConfigs = new HashMap<>();
@@ -123,6 +129,7 @@ public class ApplicationConfig {
         }
         notifyObservers(notifications);
 
+        this.dmaapPublisherConfig = dmaapPublisherConfig;
         this.dmaapConsumerConfig = dmaapConsumerConfig;
     }
 
index 530ac98..34d1d97 100644 (file)
@@ -25,17 +25,16 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
-
 import javax.validation.constraints.NotNull;
-
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.springframework.http.MediaType;
 
 public class ApplicationConfigParser {
 
@@ -46,6 +45,7 @@ public class ApplicationConfigParser {
         .create(); //
 
     private Vector<RicConfig> ricConfig;
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     public ApplicationConfigParser() {
@@ -54,14 +54,28 @@ public class ApplicationConfigParser {
     public void parse(JsonObject root) throws ServiceException {
         JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
         ricConfig = parseRics(ricConfigJson);
-        JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
-        dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
+        JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes");
+        if (dmaapPublisherConfigJson == null) {
+            dmaapPublisherConfig = new Properties();
+        } else {
+            dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson);
+        }
+        JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes");
+        if (dmaapConsumerConfigJson == null) {
+            dmaapConsumerConfig = new Properties();
+        } else {
+            dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson);
+        }
     }
 
     public Vector<RicConfig> getRicConfigs() {
         return this.ricConfig;
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapPublisherConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -86,7 +100,7 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsJsonArray();
     }
 
-    private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+    private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException {
         Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
         if (topics.size() != 1) {
             throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
@@ -106,17 +120,19 @@ public class ApplicationConfigParser {
                 passwd = userInfo[1];
             }
             String urlPath = url.getPath();
-            DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+            DmaapUrlPath path = parseDmaapUrlPath(urlPath);
 
-            dmaapProps.put("port", url.getPort());
-            dmaapProps.put("server", url.getHost());
+            dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events");
             dmaapProps.put("topic", path.dmaapTopicName);
-            dmaapProps.put("consumerGroup", path.consumerGroup);
-            dmaapProps.put("consumerInstance", path.consumerId);
-            dmaapProps.put("fetchTimeout", 15000);
-            dmaapProps.put("fetchLimit", 1000);
+            dmaapProps.put("host", url.getHost()+":"+url.getPort());
+            dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
             dmaapProps.put("userName", userName);
             dmaapProps.put("password", passwd);
+            dmaapProps.put("group", path.consumerGroup);
+            dmaapProps.put("id", path.consumerId);
+            dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
+            dmaapProps.put("timeout", 15000);
+            dmaapProps.put("limit", 1000);
         } catch (MalformedURLException e) {
             throw new ServiceException("Could not parse the URL", e);
         }
@@ -128,27 +144,31 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsString();
     }
 
-    private class DmaapConsumerUrlPath {
+    private class DmaapUrlPath {
         final String dmaapTopicName;
         final String consumerGroup;
         final String consumerId;
 
-        DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+        DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
             this.dmaapTopicName = dmaapTopicName;
             this.consumerGroup = consumerGroup;
             this.consumerId = consumerId;
         }
     }
 
-    private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+    private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
         String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
-        if (tokens.length != 5) {
+        if (!(tokens.length == 3 ^ tokens.length == 5)) {
             throw new ServiceException("The path has incorrect syntax: " + urlPath);
         }
 
-        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
-        final String consumerGroup = tokens[3]; // users
-        final String consumerId = tokens[4]; // sdnc1
-        return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+        final String dmaapTopicName =  tokens[2]; // /events/A1-P
+        String consumerGroup = ""; // users
+        String consumerId = ""; // sdnc1
+        if (tokens.length == 5) {
+            consumerGroup = tokens[3];
+            consumerId = tokens[4];
+        }
+        return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
     }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java
new file mode 100644 (file)
index 0000000..085320c
--- /dev/null
@@ -0,0 +1,44 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.configuration;
+
+import java.util.concurrent.Executor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+@EnableAsync
+public class AsyncConfiguration implements AsyncConfigurer {
+
+    @Override
+    @Bean(name = "threadPoolTaskExecutor")
+    public Executor getAsyncExecutor() {
+        //Set this configuration value from common properties file
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(5);
+        executor.setMaxPoolSize(10);
+        executor.setQueueCapacity(25);
+        return executor;
+    }
+}
index bda5e09..cc1d711 100644 (file)
@@ -23,16 +23,23 @@ package org.oransc.policyagent.controllers;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Vector;
 
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.Service;
 import org.oransc.policyagent.repository.Services;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -43,31 +50,40 @@ import org.springframework.web.bind.annotation.RestController;
 public class ServiceController {
 
     private final Services services;
+    private final Policies policies;
+
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
 
     @Autowired
-    ServiceController(Services services) {
+    ServiceController(Services services, Policies policies) {
         this.services = services;
+        this.policies = policies;
     }
 
-    @GetMapping("/service")
-    public ResponseEntity<String> getService( //
-        @RequestParam(name = "name", required = true) String name) {
-        try {
-            Service s = services.getService(name);
-            String res = gson.toJson(toServiceStatus(s));
-            return new ResponseEntity<String>(res, HttpStatus.OK);
+    @GetMapping("/services")
+    @ApiOperation(value = "Returns service information", response = ServiceStatus.class)
+    @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
+    public ResponseEntity<String> getServices( //
+        @RequestParam(name = "name", required = false) String name) {
 
-        } catch (ServiceException e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+        Collection<ServiceStatus> servicesStatus = new Vector<>();
+        synchronized (this.services) {
+            for (Service s : this.services.getAll()) {
+                if (name == null || name.equals(s.name())) {
+                    servicesStatus.add(toServiceStatus(s));
+                }
+            }
         }
+
+        String res = gson.toJson(servicesStatus);
+        return new ResponseEntity<String>(res, HttpStatus.OK);
     }
 
     private ServiceStatus toServiceStatus(Service s) {
         return ImmutableServiceStatus.builder() //
-            .name(s.getName()) //
+            .name(s.name()) //
             .keepAliveInterval(s.getKeepAliveInterval().toSeconds()) //
             .timeSincePing(s.timeSinceLastPing().toSeconds()) //
             .build();
@@ -85,31 +101,39 @@ public class ServiceController {
         }
     }
 
-    private Service toService(ServiceRegistrationInfo s) {
-        return new Service(s.name(), Duration.ofSeconds(s.keepAliveInterval()), s.callbackUrl());
+    @DeleteMapping("/services")
+    public ResponseEntity<String> deleteService( //
+        @RequestParam(name = "name", required = true) String name) {
+        try {
+            Service service = removeService(name);
+            // Remove the policies from the repo and let the consistency monitoring
+            // do the rest.
+            removePolicies(service);
+            return new ResponseEntity<String>("OK", HttpStatus.NO_CONTENT);
+        } catch (Exception e) {
+            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+        }
     }
 
-    @GetMapping("/services")
-    public ResponseEntity<?> getServices() {
-        Collection<ServiceStatus> result = new Vector<>();
+    private Service removeService(String name) throws ServiceException {
         synchronized (this.services) {
-            for (Service s : this.services.getAll()) {
-                result.add(toServiceStatus(s));
-            }
+            Service service = this.services.getService(name);
+            this.services.remove(service.name());
+            return service;
         }
-        return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
     }
 
-    @PutMapping("/service/ping")
-    public ResponseEntity<String> ping( //
-        @RequestBody String name) {
-        try {
-            Service s = services.getService(name);
-            s.ping();
-            return new ResponseEntity<String>("OK", HttpStatus.OK);
-        } catch (ServiceException e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+    private void removePolicies(Service service) {
+        synchronized (this.policies) {
+            Vector<Policy> policyList = new Vector<>(this.policies.getForService(service.name()));
+            for (Policy policy : policyList) {
+                this.policies.remove(policy);
+            }
         }
     }
 
+    private Service toService(ServiceRegistrationInfo s) {
+        return new Service(s.name(), Duration.ofSeconds(s.keepAliveInterval()), s.callbackUrl());
+    }
+
 }
index fd42104..889dfaf 100644 (file)
@@ -21,8 +21,6 @@
 
 package org.oransc.policyagent.dmaap;
 
-import java.util.Properties;
-
 /**
  * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
  *
@@ -34,7 +32,7 @@ public interface DmaapMessageConsumer {
      *
      * @param properties
      */
-    public void init(Properties properties);
+    public void init();
 
     /**
      * This method process the message and call the respective Controller
index 74cfe0d..2ae5e5e 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.dmaap;
 
 import java.io.IOException;
 import java.util.Properties;
+import javax.annotation.PostConstruct;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -40,21 +41,27 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
 
     private boolean alive = false;
+    private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRConsumerResponse response = null;
+    @Autowired
+    private DmaapMessageHandler dmaapMessageHandler;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
-        init(dmaapConsumerConfig);
+        this.applicationConfig = applicationConfig;
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
+    @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
     @Override
     public void run() {
-        while (this.alive) {
+        /*
+         * if (!alive) { init(); }
+         */
+        if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
+                logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
                 for (String msg : dmaapMsgs) {
                     processMsg(msg);
                 }
@@ -78,24 +85,20 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         return response.getActualMessages();
     }
 
+    @PostConstruct
     @Override
-    public void init(Properties properties) {
-        // Initialize the DMAAP with the properties
-        // Do we need to do any validation of properties before calling the factory?
-        Properties prop = new Properties();
-        prop.setProperty("ServiceName", "localhost:6845/events");
-        prop.setProperty("topic", "A1-P");
-        prop.setProperty("host", "localhost:6845");
-        prop.setProperty("contenttype", "application/json");
-        prop.setProperty("username", "admin");
-        prop.setProperty("password", "admin");
-        prop.setProperty("group", "users");
-        prop.setProperty("id", "policy-agent");
-        prop.setProperty("TransportType", "HTTPNOAUTH");
-        prop.setProperty("timeout", "15000");
-        prop.setProperty("limit", "1000");
+    public void init() {
+        Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+        // No need to start if there is no configuration.
+        if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+                || dmaapPublisherProperties.size() == 0) {
+            logger.error("DMaaP properties Failed to Load");
+            return;
+        }
         try {
-            consumer = MRClientFactory.createConsumer(prop);
+            logger.debug("Creating DMAAP Client");
+            consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
             logger.error("Exception occurred while creating Dmaap Consumer", e);
@@ -104,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
     @Override
     public void processMsg(String msg) throws Exception {
-        System.out.println("sysout" + msg);
+        logger.debug("Message Reveived from DMAAP : {}", msg);
         // Call the concurrent Task executor to handle the incoming request
-        // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
-        // through REST CLIENT
-        // Call the Controller with the extracted payload
+        dmaapMessageHandler.handleDmaapMsg(msg);
     }
 
     @Override
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
new file mode 100644 (file)
index 0000000..bf9f06c
--- /dev/null
@@ -0,0 +1,175 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.dmaap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.controllers.PolicyController;
+import org.oransc.policyagent.model.DmaapMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+
+    @Autowired
+    private ObjectMapper mapper;
+    @Autowired
+    private PolicyController policyController;
+    private AsyncRestClient restClient;
+    private ApplicationConfig applicationConfig;
+    private String topic = "";
+
+    @Autowired
+    public DmaapMessageHandler(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
+
+    // The publish properties is corrupted. It contains the subscribe property values.
+    @Async("threadPoolTaskExecutor")
+    public void handleDmaapMsg(String msg) {
+        init();
+        DmaapMessage dmaapMessage = null;
+        ResponseEntity<String> response = null;
+        // Process the message
+        /**
+         * Sample Request Message from DMAAP { "type": "request", "correlationId":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
+         * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
+         * "payload": "{\"ric\":\"ric1\"}" }
+         * --------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+         * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+         * -------------------------------------------------------------------------------------------------------------
+         * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+         * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+         * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
+         */
+        try {
+            dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
+            // Post the accepted message to the DMAAP bus
+            logger.debug("DMAAP Message- {}", dmaapMessage);
+            logger.debug("Post Accepted Message to Client");
+            restClient
+                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
+                    .block(); //
+            // Call the Controller
+            logger.debug("Invoke the Policy Agent Controller");
+            response = invokeController(dmaapMessage);
+            // Post the Response message to the DMAAP bus
+            logger.debug("DMAAP Response Message to Client- {}", response);
+            restClient
+                    .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+                            dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
+                    .block(); //
+        } catch (IOException e) {
+            logger.error("Exception occured during message processing", e);
+        }
+    }
+
+    private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+        String formattedString = "";
+        String ricName;
+        String instance;
+        logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+        try {
+            formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+            logger.debug("Removed the Escape charater in payload- {}", formattedString);
+        } catch (JSONException e) {
+            logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+        }
+        JSONObject jsonObject = new JSONObject(formattedString);
+        switch (dmaapMessage.getOperation()) {
+            case "getPolicySchemas":
+                ricName = (String) jsonObject.get("ricName");
+                logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
+                return policyController.getPolicySchemas(ricName);
+            case "getPolicySchema":
+                String policyTypeId = (String) jsonObject.get("id");
+                logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
+                System.out.println("policyTypeId" + policyTypeId);
+                return policyController.getPolicySchema(policyTypeId);
+            case "getPolicyTypes":
+                ricName = (String) jsonObject.get("ricName");
+                logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
+                return policyController.getPolicyTypes(ricName);
+            case "getPolicy":
+                instance = (String) jsonObject.get("instance");
+                logger.debug("Received the request for getPolicy with Instance- {}", instance);
+                return policyController.getPolicy(instance);
+            case "deletePolicy":
+                instance = (String) jsonObject.get("instance");
+                logger.debug("Received the request for deletePolicy with Instance- {}", instance);
+                return null;// policyController.deletePolicy(deleteInstance);
+            case "putPolicy":
+                String type = (String) jsonObject.get("type");
+                String putPolicyInstance = (String) jsonObject.get("instance");
+                String putPolicyRic = (String) jsonObject.get("ric");
+                String service = (String) jsonObject.get("service");
+                String jsonBody = (String) jsonObject.get("jsonBody");
+                return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+            case "getPolicies":
+                String getPolicyType = (String) jsonObject.get("type");
+                String getPolicyRic = (String) jsonObject.get("ric");
+                String getPolicyService = (String) jsonObject.get("service");
+                return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
+            String message) {
+        System.out.println("buildResponse ");
+        return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
+                .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
+                .put("message", message).toString();
+    }
+
+    // @PostConstruct
+    // The application properties value is always NULL for the first time
+    // Need to fix this
+    public void init() {
+        logger.debug("Reading DMAAP Publisher bus details from Application Config");
+        Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
+        String host = (String) dmaapPublisherConfig.get("ServiceName");
+        topic = dmaapPublisherConfig.getProperty("topic");
+        logger.debug("Read the topic & Service Name - {} , {}", host, topic);
+        this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+
+    }
+}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
new file mode 100644 (file)
index 0000000..e56f4b4
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.model;
+
+import java.sql.Timestamp;
+import javax.validation.constraints.NotNull;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class DmaapMessage {
+
+    @NotNull
+    private String type;
+    @NotNull
+    private String correlationId;
+    @NotNull
+    private String target;
+    @NotNull
+    private Timestamp timestamp;
+    private String apiVersion;
+    @NotNull
+    private String originatorId;
+    private String requestId;
+    @NotNull
+    private String operation;
+    private String payload;
+}
index 6458dbb..18a85a9 100644 (file)
@@ -36,7 +36,7 @@ public class Service {
         ping();
     }
 
-    public synchronized String getName() {
+    public synchronized String name() {
         return this.name;
     }
 
@@ -44,7 +44,7 @@ public class Service {
         return this.keepAliveInterval;
     }
 
-    public synchronized void ping() {
+    private synchronized void ping() {
         this.lastPing = Instant.now();
     }
 
index 2e1d8da..01d6a7a 100644 (file)
@@ -48,12 +48,20 @@ public class Services {
     }
 
     public synchronized void put(Service service) {
-        logger.debug("Put service: " + service.getName());
-        services.put(service.getName(), service);
+        logger.debug("Put service: " + service.name());
+        services.put(service.name(), service);
     }
 
     public synchronized Iterable<Service> getAll() {
         return services.values();
     }
 
+    public synchronized void remove(String name) {
+        services.remove(name);
+    }
+
+    public synchronized int size() {
+        return services.size();
+    }
+
 }
index bc43eda..1ab5fc9 100644 (file)
@@ -125,7 +125,8 @@ public class RefreshConfigTask {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             parser.parse(jsonObject);
-            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig());
+            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
+                parser.getDmaapConsumerConfig());
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
         }
@@ -152,7 +153,8 @@ public class RefreshConfigTask {
             }
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig());
+            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
+                appParser.getDmaapConsumerConfig());
             logger.info("Local configuration file loaded: {}", filepath);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
             logger.trace("Local configuration file not loaded: {}", filepath, e);
index d4b32e0..c10e004 100644 (file)
@@ -72,7 +72,7 @@ public class ServiceSupervision {
         synchronized (services) {
             return Flux.fromIterable(services.getAll()) //
                 .filter(service -> service.isExpired()) //
-                .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
+                .doOnNext(service -> logger.info("Service is expired:" + service.name())) //
                 .flatMap(service -> getAllPolicies(service)) //
                 .doOnNext(policy -> this.policies.remove(policy)) //
                 .flatMap(policy -> deletePolicyInRic(policy));
@@ -81,7 +81,7 @@ public class ServiceSupervision {
 
     private Flux<Policy> getAllPolicies(Service service) {
         synchronized (policies) {
-            return Flux.fromIterable(policies.getForService(service.getName()));
+            return Flux.fromIterable(policies.getForService(service.name()));
         }
     }
 
index 4270ded..5b1f3ca 100644 (file)
@@ -26,8 +26,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Vector;
 
@@ -49,6 +52,7 @@ import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
 import org.oransc.policyagent.tasks.RepositorySupervision;
 import org.oransc.policyagent.utils.MockA1Client;
 import org.oransc.policyagent.utils.MockA1ClientFactory;
@@ -85,6 +89,9 @@ public class ApplicationTest {
     @Autowired
     RepositorySupervision supervision;
 
+    @Autowired
+    Services services;
+
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
@@ -284,14 +291,6 @@ public class ApplicationTest {
         assertThat(policies.size()).isEqualTo(0);
     }
 
-    private static <T> List<T> parseList(String json, Class<T> clazz) {
-        if (null == json) {
-            return null;
-        }
-        return gson.fromJson(json, new TypeToken<T>() {}.getType());
-
-    }
-
     @Test
     public void testGetPolicySchemas() throws Exception {
         reset();
@@ -305,13 +304,13 @@ public class ApplicationTest {
         assertThat(rsp).contains("type2");
         assertThat(rsp).contains("title");
 
-        List<String> info = parseList(rsp, String.class);
+        List<String> info = parseSchemas(rsp);
         assertEquals(2, info.size());
 
         url = baseUrl() + "/policy_schemas?ric=ric1";
         rsp = this.restTemplate.getForObject(url, String.class);
         assertThat(rsp).contains("type1");
-        info = parseList(rsp, String.class);
+        info = parseSchemas(rsp);
         assertEquals(1, info.size());
     }
 
@@ -393,21 +392,48 @@ public class ApplicationTest {
 
     @Test
     public void testPutAndGetService() throws Exception {
+        // PUT
         putService("name");
 
-        String url = baseUrl() + "/service?name=name";
+        // GET
+        String url = baseUrl() + "/services?name=name";
         String rsp = this.restTemplate.getForObject(url, String.class);
-        ServiceStatus status = gson.fromJson(rsp, ImmutableServiceStatus.class);
+        List<ImmutableServiceStatus> info = parseList(rsp, ImmutableServiceStatus.class);
+        assertThat(info.size() == 1);
+        ServiceStatus status = info.iterator().next();
         assertThat(status.keepAliveInterval() == 1);
         assertThat(status.name().equals("name"));
 
+        // GET (all)
         url = baseUrl() + "/services";
         rsp = this.restTemplate.getForObject(url, String.class);
         assertThat(rsp.contains("name"));
         System.out.println(rsp);
 
-        url = baseUrl() + "/service/ping";
-        this.restTemplate.put(url, "name");
+        // DELETE
+        assertThat(services.size() == 1);
+        url = baseUrl() + "/services?name=name";
+        this.restTemplate.delete(url);
+        assertThat(services.size() == 0);
+    }
+
+    private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
+        List<T> result = new ArrayList<>();
+        JsonArray jsonArr = new JsonParser().parse(jsonString).getAsJsonArray();
+        for (JsonElement jsonElement : jsonArr) {
+            T o = gson.fromJson(jsonElement.toString(), clazz);
+            result.add(o);
+        }
+        return result;
+    }
+
+    private static List<String> parseSchemas(String jsonString) {
+        JsonArray arrayOfSchema = new JsonParser().parse(jsonString).getAsJsonArray();
+        List<String> result = new ArrayList<>();
+        for (JsonElement schemaObject : arrayOfSchema) {
+            result.add(schemaObject.toString());
+        }
+        return result;
     }
 
 }
index bbbc06d..b4c4129 100644 (file)
@@ -121,7 +121,6 @@ public class MockPolicyAgent {
                 }
             }
         }
-
     }
 
     @LocalServerPort
index c63b710..446c061 100644 (file)
             ]
          }
       ]
-   },
-   "streams_subscribes": {
-      "dmaap_subscriber": {
-         "dmaap_info": {
-            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
-         },
-         "type": "message_router"
-      }
    }
 }
\ No newline at end of file
diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
new file mode 100644 (file)
index 0000000..c945360
--- /dev/null
@@ -0,0 +1,39 @@
+{
+   "config": {
+      "//description": "Application configuration",
+      "ric": [
+         {
+            "name": "ric1",
+            "baseUrl": "http://localhost:8080/",
+            "managedElementIds": [
+               "kista_1",
+               "kista_2"
+            ]
+         },
+         {
+            "name": "ric2",
+            "baseUrl": "http://localhost:8081/",
+            "managedElementIds": [
+               "kista_3",
+               "kista_4"
+            ]
+         }
+      ]
+   },
+   "streams_publishes": {
+      "dmaap_publisher": {
+         "type": "message_router",
+         "dmaap_info": {
+            "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+         }
+      }
+   },
+   "streams_subscribes": {
+      "dmaap_subscriber": {
+         "dmaap_info": {
+            "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+         },
+         "type": "message_router"
+      }
+   }
+}
\ No newline at end of file