Updates of the policy agent NBI 49/2449/2
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 7 Feb 2020 11:59:09 +0000 (12:59 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 10 Feb 2020 13:49:17 +0000 (14:49 +0100)
Minor changes.

Removed an unused interface.

Change-Id: I30166c27546dc584d8ee4675af3d807e1175282f
Issue-ID: NONRTRIC-107
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
17 files changed:
dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/controller/PolicyController.java
dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/model/PolicyInfo.java
dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/model/PolicyType.java
dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApi.java
dashboard/webapp-backend/src/main/java/org/oransc/ric/portal/dashboard/policyagentapi/PolicyAgentApiImpl.java
dashboard/webapp-backend/src/test/java/org/oransc/ric/portal/dashboard/config/PolicyControllerMockConfiguration.java
policy-agent/docs/api.yaml
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyInfo.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/StatusController.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java [deleted file]
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java

index c01c7c6..20a0c3f 100644 (file)
@@ -81,7 +81,7 @@ public class PolicyController {
      */
     @ApiOperation(value = "Gets the policy types from Near Realtime-RIC")
     @GetMapping(POLICY_TYPES_METHOD)
-    @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
+    @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
     public ResponseEntity<String> getAllPolicyTypes(HttpServletResponse response) {
         logger.debug("getAllPolicyTypes");
         return this.policyAgentApi.getAllPolicyTypes();
@@ -89,7 +89,7 @@ public class PolicyController {
 
     @ApiOperation(value = "Returns the policy instances for the given policy type.")
     @GetMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME)
-    @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
+    @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
     public ResponseEntity<String> getPolicyInstances(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString) {
         logger.debug("getPolicyInstances {}", policyTypeIdString);
         return this.policyAgentApi.getPolicyInstancesForType(policyTypeIdString);
@@ -97,41 +97,41 @@ public class PolicyController {
 
     @ApiOperation(value = "Returns a policy instance of a type")
     @GetMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME + "/{" + POLICY_INSTANCE_ID_NAME
-        + "}")
-    @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
-    public ResponseEntity<String> getPolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
-        @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
+            + "}")
+    @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
+    public ResponseEntity<Object> getPolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
+            @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
         logger.debug("getPolicyInstance {}:{}", policyTypeIdString, policyInstanceId);
         return this.policyAgentApi.getPolicyInstance(policyInstanceId);
     }
 
     @ApiOperation(value = "Creates the policy instances for the given policy type.")
     @PutMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME + "/{" + POLICY_INSTANCE_ID_NAME
-        + "}")
-    @Secured({DashboardConstants.ROLE_ADMIN})
+            + "}")
+    @Secured({ DashboardConstants.ROLE_ADMIN })
     public ResponseEntity<String> putPolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
-        @RequestParam(name = "ric", required = true) String ric,
-        @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId, @RequestBody String instance) {
+            @RequestParam(name = "ric", required = true) String ric,
+            @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId, @RequestBody String instance) {
         logger.debug("putPolicyInstance typeId: {}, instanceId: {}, instance: {}", policyTypeIdString, policyInstanceId,
-            instance);
+                instance);
         return this.policyAgentApi.putPolicy(policyTypeIdString, policyInstanceId, instance, ric);
     }
 
     @ApiOperation(value = "Deletes the policy instances for the given policy type.")
     @DeleteMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME + "/{"
-        + POLICY_INSTANCE_ID_NAME + "}")
-    @Secured({DashboardConstants.ROLE_ADMIN})
+            + POLICY_INSTANCE_ID_NAME + "}")
+    @Secured({ DashboardConstants.ROLE_ADMIN })
     public ResponseEntity<String> deletePolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
-        @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
+            @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
         logger.debug("deletePolicyInstance typeId: {}, instanceId: {}", policyTypeIdString, policyInstanceId);
         return this.policyAgentApi.deletePolicy(policyInstanceId);
     }
 
     @ApiOperation(value = "Returns the rics supporting the given policy type.")
     @GetMapping("/rics")
-    @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
+    @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
     public ResponseEntity<String> getRicsSupportingType(
-        @RequestParam(name = "policyType", required = true) String supportingPolicyType) {
+            @RequestParam(name = "policyType", required = true) String supportingPolicyType) {
         logger.debug("getRicsSupportingType {}", supportingPolicyType);
 
         return this.policyAgentApi.getRicsSupportingType(supportingPolicyType);
index f0ca285..035428b 100644 (file)
@@ -27,9 +27,9 @@ public class PolicyType {
     String name;
 
     @JsonProperty("schema")
-    String schema;
+    Object schema;
 
-    public PolicyType(String name, String schema) {
+    public PolicyType(String name, Object schema) {
         this.name = name;
         this.schema = schema;
     }
@@ -42,16 +42,16 @@ public class PolicyType {
         this.name = name;
     }
 
-    public String getSchema() {
+    public Object getSchema() {
         return schema;
     }
 
-    public void setSchema(String schema) {
+    public void setSchema(Object schema) {
         this.schema = schema;
     }
 
     @Override
     public String toString() {
-        return "[name:" + name + ", schema:" + schema + "]";
+        return "[name:" + name + ", schema:" + schema.toString() + "]";
     }
 }
index ff254d2..6b7fdd2 100644 (file)
@@ -27,10 +27,10 @@ public interface PolicyAgentApi {
 
     public ResponseEntity<String> getPolicyInstancesForType(String type);
 
-    public ResponseEntity<String> getPolicyInstance(String id);
+    public ResponseEntity<Object> getPolicyInstance(String id);
 
-    public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
-        String ric);
+    public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, Object json,
+            String ric);
 
     public ResponseEntity<String> deletePolicy(String policyInstanceId);
 
index c0dde9b..fe9c26c 100644 (file)
@@ -58,14 +58,14 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
     RestTemplate restTemplate = new RestTemplate();
 
     private static com.google.gson.Gson gson = new GsonBuilder() //
-        .serializeNulls() //
-        .create(); //
+            .serializeNulls() //
+            .create(); //
 
     private final String urlPrefix;
 
     @Autowired
     public PolicyAgentApiImpl(
-        @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
+            @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
         logger.debug("ctor prefix '{}'", urlPrefix);
         this.urlPrefix = urlPrefix;
     }
@@ -119,7 +119,8 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
         }
 
         try {
-            Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {}.getType();
+            Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {
+            }.getType();
             List<PolicyInfo> rspParsed = gson.fromJson(rsp.getBody(), listType);
             PolicyInstances result = new PolicyInstances();
             for (PolicyInfo p : rspParsed) {
@@ -132,22 +133,22 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
     }
 
     @Override
-    public ResponseEntity<String> getPolicyInstance(String id) {
+    public ResponseEntity<Object> getPolicyInstance(String id) {
         String url = baseUrl() + "/policy?instance={id}";
         Map<String, ?> uriVariables = Map.of("id", id);
 
-        return this.restTemplate.getForEntity(url, String.class, uriVariables);
+        return this.restTemplate.getForEntity(url, Object.class, uriVariables);
     }
 
     @Override
-    public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
-        String ric) {
+    public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, Object json,
+            String ric) {
         String url = baseUrl() + "/policy?type={type}&instance={instance}&ric={ric}&service={service}";
         Map<String, ?> uriVariables = Map.of( //
-            "type", policyTypeIdString, //
-            "instance", policyInstanceId, //
-            "ric", ric, //
-            "service", "dashboard");
+                "type", policyTypeIdString, //
+                "instance", policyInstanceId, //
+                "ric", ric, //
+                "service", "dashboard");
 
         try {
             this.restTemplate.put(url, createJsonHttpEntity(json), uriVariables);
@@ -187,7 +188,8 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
         String rsp = this.restTemplate.getForObject(url, String.class, uriVariables);
 
         try {
-            Type listType = new TypeToken<List<ImmutableRicInfo>>() {}.getType();
+            Type listType = new TypeToken<List<ImmutableRicInfo>>() {
+            }.getType();
             List<RicInfo> rspParsed = gson.fromJson(rsp, listType);
             Collection<String> result = new Vector<>(rspParsed.size());
             for (RicInfo ric : rspParsed) {
@@ -199,10 +201,10 @@ public class PolicyAgentApiImpl implements PolicyAgentApi {
         }
     }
 
-    private HttpEntity<String> createJsonHttpEntity(String content) {
+    private HttpEntity<Object> createJsonHttpEntity(Object content) {
         HttpHeaders headers = new HttpHeaders();
         headers.setContentType(MediaType.APPLICATION_JSON);
-        return new HttpEntity<String>(content, headers);
+        return new HttpEntity<Object>(content, headers);
     }
 
 }
index 2268fe0..76ce40c 100644 (file)
@@ -70,12 +70,12 @@ public class PolicyControllerMockConfiguration {
         private final Database database = new Database();
 
         @Override
-        public ResponseEntity<String> getPolicyInstance(String id) {
+        public ResponseEntity<Object> getPolicyInstance(String id) {
             return new ResponseEntity<>(database.getInstance(id), HttpStatus.OK);
         }
 
         @Override
-        public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
+        public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, Object json,
                 String ric) {
             database.putInstance(policyTypeIdString, policyInstanceId, json, ric);
             return new ResponseEntity<>("Policy was put successfully", HttpStatus.OK);
@@ -147,7 +147,7 @@ public class PolicyControllerMockConfiguration {
             return java.time.Instant.now().toString();
         }
 
-        void putInstance(String typeId, String instanceId, String instanceData, String ric) {
+        void putInstance(String typeId, String instanceId, Object instanceData, String ric) {
             PolicyInfo i = ImmutablePolicyInfo.builder().json(instanceData).lastModified(getTimeStampUTC())
                     .id(instanceId).ric(ric).service("service").type(typeId).build();
             instances.put(instanceId, i);
@@ -157,7 +157,7 @@ public class PolicyControllerMockConfiguration {
             instances.remove(instanceId);
         }
 
-        String getInstance(String id) throws RestClientException {
+        Object getInstance(String id) throws RestClientException {
             PolicyInfo i = instances.get(id);
             if (i == null) {
                 throw new RestClientException("Type not found: " + id);
index 1416298..1c41d42 100644 (file)
@@ -516,7 +516,7 @@ paths:
           schema:
             type: array
             items:
-              type: string
+              type: object
         '401':
           description: Unauthorized
         '403':
@@ -705,9 +705,9 @@ paths:
       produces:
         - '*/*'
       parameters:
-        - name: name
+        - name: serviceName
           in: query
-          description: name
+          description: serviceName
           required: true
           type: string
       responses:
@@ -733,9 +733,9 @@ paths:
       produces:
         - '*/*'
       parameters:
-        - name: name
+        - name: serviceName
           in: query
-          description: name
+          description: serviceName
           required: true
           type: string
       responses:
@@ -756,7 +756,7 @@ paths:
     get:
       tags:
         - status-controller
-      summary: Returns status and statistics of the service
+      summary: Returns status and statistics of this service
       operationId: getStatusUsingGET
       produces:
         - '*/*'
@@ -799,7 +799,7 @@ definitions:
         type: string
         description: identity of the policy
       json:
-        type: string
+        type: object
         description: the configuration of the policy
       lastModified:
         type: string
@@ -841,7 +841,7 @@ definitions:
         type: integer
         format: int64
         description: keep alive interval for policies owned by the service. 0 means no timeout supervision. Polcies that are not refreshed within this time are removed
-      name:
+      serviceName:
         type: string
         description: identity of the service
     title: ServiceRegistrationInfo
@@ -852,7 +852,7 @@ definitions:
         type: integer
         format: int64
         description: policy keep alive timeout
-      name:
+      serviceName:
         type: string
         description: identity of the service
       timeSincePingSeconds:
index e0971c5..965b42f 100644 (file)
@@ -78,7 +78,7 @@ public class PolicyController {
     @ApiOperation(value = "Returns policy type schema definitions")
     @ApiResponses(
         value = {
-            @ApiResponse(code = 200, message = "Policy schemas", response = String.class, responseContainer = "List")})
+            @ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List")})
     public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
         synchronized (this.policyTypes) {
             if (ricName == null) {
@@ -272,7 +272,7 @@ public class PolicyController {
         for (Policy p : policies) {
             PolicyInfo policyInfo = new PolicyInfo();
             policyInfo.id = p.id();
-            policyInfo.json = p.json();
+            policyInfo.json = fromJson(p.json());
             policyInfo.ric = p.ric().name();
             policyInfo.type = p.type().name();
             policyInfo.service = p.ownerServiceName();
@@ -285,6 +285,10 @@ public class PolicyController {
         return gson.toJson(v);
     }
 
+    private Object fromJson(String jsonStr) {
+        return gson.fromJson(jsonStr, Object.class);
+    }
+
     private String toPolicyTypeSchemasJson(Collection<PolicyType> types) {
         StringBuilder result = new StringBuilder();
         result.append("[");
index bd13062..1736c18 100644 (file)
@@ -39,7 +39,7 @@ public class PolicyInfo {
     public String ric;
 
     @ApiModelProperty(value = "the configuration of the policy")
-    public String json;
+    public Object json;
 
     @ApiModelProperty(value = "the name of the service owning the policy")
     public String service;
index bb3b735..38369a8 100644 (file)
@@ -57,7 +57,7 @@ public class RicRepositoryController {
     }
 
     /**
-     * Example: http://localhost:8080/rics?managedElementId=kista_1
+     * Example: http://localhost:8081/rics?managedElementId=kista_1
      */
     @GetMapping("/ric")
     @ApiOperation(value = "Returns the name of a RIC managing one Mananged Element")
@@ -80,7 +80,7 @@ public class RicRepositoryController {
 
     /**
      * @return a Json array of all RIC data
-     *         Example: http://localhost:8080/ric
+     *         Example: http://localhost:8081/ric
      */
     @GetMapping("/rics")
     @ApiOperation(value = "Query NearRT RIC information")
index 012d40a..e68b82c 100644 (file)
@@ -104,9 +104,9 @@ public class ServiceController {
     @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
     @DeleteMapping("/services")
     public ResponseEntity<String> deleteService( //
-        @RequestParam(name = "name", required = true) String name) {
+        @RequestParam(name = "serviceName", required = true) String serviceName) {
         try {
-            Service service = removeService(name);
+            Service service = removeService(serviceName);
             // Remove the policies from the repo and let the consistency monitoring
             // do the rest.
             removePolicies(service);
@@ -122,9 +122,9 @@ public class ServiceController {
             @ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
     @PostMapping("/services/keepalive")
     public ResponseEntity<String> keepAliveService( //
-        @RequestParam(name = "name", required = true) String name) {
+        @RequestParam(name = "serviceName", required = true) String serviceName) {
         try {
-            services.getService(name).ping();
+            services.getService(serviceName).ping();
             return new ResponseEntity<String>("OK", HttpStatus.OK);
         } catch (Exception e) {
             return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
@@ -149,7 +149,7 @@ public class ServiceController {
     }
 
     private Service toService(ServiceRegistrationInfo s) {
-        return new Service(s.name, Duration.ofSeconds(s.keepAliveIntervalSeconds), s.callbackUrl);
+        return new Service(s.serviceName, Duration.ofSeconds(s.keepAliveIntervalSeconds), s.callbackUrl);
     }
 
 }
index 145fdb0..907fa1c 100644 (file)
@@ -30,7 +30,7 @@ import org.immutables.gson.Gson;
 public class ServiceRegistrationInfo {
 
     @ApiModelProperty(value = "identity of the service")
-    public String name;
+    public String serviceName;
 
     @ApiModelProperty(
         value = "keep alive interval for policies owned by the service. 0 means no timeout supervision."
@@ -44,7 +44,7 @@ public class ServiceRegistrationInfo {
     }
 
     public ServiceRegistrationInfo(String name, long keepAliveIntervalSeconds, String callbackUrl) {
-        this.name = name;
+        this.serviceName = name;
         this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
         this.callbackUrl = callbackUrl;
     }
index c30cce1..fc80834 100644 (file)
@@ -30,7 +30,7 @@ import org.immutables.gson.Gson;
 public class ServiceStatus {
 
     @ApiModelProperty(value = "identity of the service")
-    public final String name;
+    public final String serviceName;
 
     @ApiModelProperty(value = "policy keep alive timeout")
     public final long keepAliveIntervalSeconds;
@@ -39,7 +39,7 @@ public class ServiceStatus {
     public final long timeSincePingSeconds;
 
     ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds) {
-        this.name = name;
+        this.serviceName = name;
         this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
         this.timeSincePingSeconds = timeSincePingSeconds;
     }
index 1219a0d..48d9a57 100644 (file)
@@ -34,7 +34,7 @@ import reactor.core.publisher.Mono;
 public class StatusController {
 
     @GetMapping("/status")
-    @ApiOperation(value = "Returns status and statistics of the service")
+    @ApiOperation(value = "Returns status and statistics of this service")
     @ApiResponses(
         value = { //
             @ApiResponse(code = 200, message = "Service is living", response = String.class) //
index c132fa2..91f9ff2 100644 (file)
@@ -1,4 +1,3 @@
-
 /*-
  * ========================LICENSE_START=================================
  * O-RAN-SC
 
 package org.oransc.policyagent.dmaap;
 
-/**
- * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
- *
- */
-public interface DmaapMessageConsumer {
-
-    /**
-     * The init method creates the MRConsumer with the properties passed from the Application Config
-     *
-     * @param properties
-     */
-    public void init();
-
-    /**
-     * This method process the message and call the respective Controller
-     *
-     * @param msg
-     * @throws Exception
-     */
-    public abstract void processMsg(String msg) throws Exception;
-
-    /**
-     * To check whether the DMAAP Listner is alive
-     *
-     * @return boolean
-     */
-    public boolean isAlive();
-
-    /**
-     * It's a infinite loop run every configured seconds to fetch the message from DMAAP. This method can be stop by
-     * setting the alive flag to false
-     */
-    public void run();
+import com.google.common.collect.Iterables;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageConsumer implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
+
+    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+    private final ApplicationConfig applicationConfig;
+
+    @Value("${server.port}")
+    private int localServerPort;
+
+    @Autowired
+    public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+
+        Thread thread = new Thread(this);
+        thread.start();
+    }
+
+    private boolean isDmaapConfigured() {
+        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
+        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
+        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+    }
+
+    @Override
+    public void run() {
+        while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
+            try {
+                Iterable<String> dmaapMsgs = fetchAllMessages();
+                if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
+                    logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+                    for (String msg : dmaapMsgs) {
+                        processMsg(msg);
+                    }
+                }
+            } catch (Exception e) {
+                logger.warn("{}: cannot fetch because of ", this, e.getMessage(), e);
+                sleep(TIME_BETWEEN_DMAAP_POLLS);
+            }
+        }
+    }
+
+    private Iterable<String> fetchAllMessages() throws ServiceException, FileNotFoundException, IOException {
+        Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
+        MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+        MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
+        if (response == null || !"200".equals(response.getResponseCode())) {
+            throw new ServiceException("DMaaP NULL response received");
+        } else {
+            logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
+            return response.getActualMessages();
+        }
+    }
+
+    private void processMsg(String msg) throws Exception {
+        logger.debug("Message Reveived from DMAAP : {}", msg);
+        createDmaapMessageHandler().handleDmaapMsg(msg);
+    }
+
+    private DmaapMessageHandler createDmaapMessageHandler() throws FileNotFoundException, IOException {
+        String agentBaseUrl = "http://localhost:" + this.localServerPort;
+        AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+        MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+
+        return new DmaapMessageHandler(producer, this.applicationConfig, agentClient);
+    }
 
+    private boolean sleep(Duration duration) {
+        try {
+            Thread.sleep(duration.toMillis());
+            return true;
+        } catch (Exception e) {
+            logger.error("Failed to put the thread to sleep", e);
+            return false;
+        }
+    }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
deleted file mode 100644 (file)
index 4a2605b..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.dmaap;
-
-import com.google.common.collect.Iterables;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DmaapMessageConsumerImpl implements Runnable {
-
-    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
-
-    final Duration ERROR_TIMEOUT = Duration.ofSeconds(30);
-    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-    private final ApplicationConfig applicationConfig;
-
-    @Value("${server.port}")
-    private int localServerPort;
-
-    @Autowired
-    public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-        this.applicationConfig = applicationConfig;
-
-        Thread thread = new Thread(this);
-        thread.start();
-    }
-
-    private boolean isDmaapConfigured() {
-        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
-        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
-        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
-    }
-
-    @Override
-    public void run() {
-        while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
-            try {
-                Iterable<String> dmaapMsgs = fetchAllMessages();
-                if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
-                    logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
-                    for (String msg : dmaapMsgs) {
-                        processMsg(msg);
-                    }
-                }
-            } catch (Exception e) {
-                logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
-                sleep(ERROR_TIMEOUT);
-            }
-        }
-    }
-
-    private Iterable<String> fetchAllMessages() throws ServiceException, FileNotFoundException, IOException {
-        Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
-        MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
-        MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
-        if (response == null || !"200".equals(response.getResponseCode())) {
-            throw new ServiceException("DMaaP NULL response received");
-        } else {
-            logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
-            return response.getActualMessages();
-        }
-    }
-
-    private void processMsg(String msg) throws Exception {
-        logger.debug("Message Reveived from DMAAP : {}", msg);
-        createDmaapMessageHandler().handleDmaapMsg(msg);
-    }
-
-    private DmaapMessageHandler createDmaapMessageHandler() throws FileNotFoundException, IOException {
-        String agentBaseUrl = "http://localhost:" + this.localServerPort;
-        AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
-        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
-        MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
-
-        return new DmaapMessageHandler(producer, this.applicationConfig, agentClient);
-    }
-
-    private boolean sleep(Duration duration) {
-        CountDownLatch sleep = new CountDownLatch(1);
-        try {
-            sleep.await(duration.toMillis(), TimeUnit.MILLISECONDS);
-            return true;
-        } catch (Exception e) {
-            logger.error("msg", e);
-            return false;
-        }
-    }
-}
index aeeb19a..38de021 100644 (file)
@@ -349,8 +349,7 @@ public class ApplicationTest {
         String rsp = this.restTemplate.getForObject(url, String.class);
         System.out.println("*** " + rsp);
         assertThat(rsp).contains("type1");
-        assertThat(rsp).contains("type2");
-        assertThat(rsp).contains("title");
+        assertThat(rsp).contains("[{\"title\":\"type2\"}");
 
         List<String> info = parseSchemas(rsp);
         assertEquals(2, info.size());
@@ -440,7 +439,7 @@ public class ApplicationTest {
         assertThat(info.size() == 1);
         ServiceStatus status = info.iterator().next();
         assertThat(status.keepAliveIntervalSeconds == 1);
-        assertThat(status.name.equals("name"));
+        assertThat(status.serviceName.equals("name"));
 
         // GET (all)
         url = baseUrl() + "/services";