Initial Working DMAAP Listener 31/2331/2
authorLathish <lathishbabu.ganesan@est.tech>
Fri, 24 Jan 2020 14:50:32 +0000 (14:50 +0000)
committerLathish <lathishbabu.ganesan@est.tech>
Fri, 24 Jan 2020 15:42:04 +0000 (15:42 +0000)
Issue-ID: NONRTRIC-107
Change-Id: I6ebf378d651e19aa236ea86b33855ceac4114587
Signed-off-by: Lathish <lathishbabu.ganesan@est.tech>
policy-agent/README.md
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java

index be04907..3ddbbd5 100644 (file)
@@ -10,6 +10,10 @@ policices. It provides support for
 -Consistency monitoring of the SMO view of policies and the actual situation in the RICs
 -Consistency monitoring of RIC capabilities (policy types)
 
+To Run Policy Agent in Local:
+Create a symbolic link with below command,
+ln -s <path to test_application_configuration.json> application_configuration.json
+
 The agent can be run stand alone in a simulated test mode. Then it 
 simulates RICs. 
 The REST API is published on port 8081 and it is started by command:
index 52761cc..43d3aef 100644 (file)
@@ -48,7 +48,7 @@
                <spotless-maven-plugin.version>1.18.0</spotless-maven-plugin.version>
                <docker-maven-plugin>0.30.0</docker-maven-plugin>
                <version.dmaap>1.1.9</version.dmaap>
-               <version.lombok>1.18.4</version.lombok>
+        <javax.ws.rs-api.version>2.1.1</javax.ws.rs-api.version>
        </properties>
        <dependencies>
                <dependency>
                        <artifactId>mockito-core</artifactId>
                        <scope>test</scope>
                </dependency>
-               <dependency>
-                       <groupId>org.projectlombok</groupId>
-                       <artifactId>lombok</artifactId>
-                       <scope>provided</scope>
-               </dependency>
                <dependency>
                        <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
                        <artifactId>dmaapClient</artifactId>
                        <version>${version.dmaap}</version>
                </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <version>${javax.ws.rs-api.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+        </dependency>
        </dependencies>
        <build>
                <plugins>
index 7e77528..fd42104 100644 (file)
@@ -1,22 +1,65 @@
+
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
 package org.oransc.policyagent.dmaap;
 
 import java.util.Properties;
 
 /**
- * The Dmaap consumer which has the base methods to be implemented by any class
- * which implements this interface
+ * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
  *
  */
-public interface DmaapMessageConsumer extends Runnable {
+public interface DmaapMessageConsumer {
 
-    public void init(Properties baseProperties);
+    /**
+     * The init method creates the MRConsumer with the properties passed from the Application Config
+     *
+     * @param properties
+     */
+    public void init(Properties properties);
 
+    /**
+     * This method process the message and call the respective Controller
+     *
+     * @param msg
+     * @throws Exception
+     */
     public abstract void processMsg(String msg) throws Exception;
 
-    public boolean isReady();
-
-    public boolean isRunning();
+    /**
+     * To check whether the DMAAP Listner is alive
+     *
+     * @return boolean
+     */
+    public boolean isAlive();
 
+    /**
+     * To Stop the DMAAP Listener
+     */
     public void stopConsumer();
 
+    /**
+     * It's a infinite loop run every configured seconds to fetch the message from DMAAP. This method can be stop by
+     * setting the alive flag to false
+     */
+    public void run();
+
 }
index 63de197..74cfe0d 100644 (file)
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
 package org.oransc.policyagent.dmaap;
 
+import java.io.IOException;
 import java.util.Properties;
-
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 @Component
+@EnableScheduling
 public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
-    private final ApplicationConfig applicationConfig;
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
 
-    protected MRConsumerImpl consumer;
+    private boolean alive = false;
+    protected MRConsumer consumer;
+    private MRConsumerResponse response = null;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-        this.applicationConfig = applicationConfig;
+        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
+        init(dmaapConsumerConfig);
     }
 
+    @Scheduled(fixedRate = 1000 * 60)
     @Override
     public void run() {
-        // TODO Auto-generated method stub
+        while (this.alive) {
+            try {
+                Iterable<String> dmaapMsgs = fetchAllMessages();
+                for (String msg : dmaapMsgs) {
+                    processMsg(msg);
+                }
+            } catch (Exception e) {
+                logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
+            }
+        }
+    }
 
+    private Iterable<String> fetchAllMessages() {
+        response = consumer.fetchWithReturnConsumerResponse();
+        if (response == null) {
+            logger.warn("{}: DMaaP NULL response received", this);
+        } else {
+            logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
+            if (!"200".equals(response.getResponseCode())) {
+                logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+                        response.getResponseMessage());
+            }
+        }
+        return response.getActualMessages();
     }
 
     @Override
-    public void init(Properties baseProperties) {
-        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
+    public void init(Properties properties) {
         // Initialize the DMAAP with the properties
-        // TODO Auto-generated method stub
-
+        // Do we need to do any validation of properties before calling the factory?
+        Properties prop = new Properties();
+        prop.setProperty("ServiceName", "localhost:6845/events");
+        prop.setProperty("topic", "A1-P");
+        prop.setProperty("host", "localhost:6845");
+        prop.setProperty("contenttype", "application/json");
+        prop.setProperty("username", "admin");
+        prop.setProperty("password", "admin");
+        prop.setProperty("group", "users");
+        prop.setProperty("id", "policy-agent");
+        prop.setProperty("TransportType", "HTTPNOAUTH");
+        prop.setProperty("timeout", "15000");
+        prop.setProperty("limit", "1000");
+        try {
+            consumer = MRClientFactory.createConsumer(prop);
+            this.alive = true;
+        } catch (IOException e) {
+            logger.error("Exception occurred while creating Dmaap Consumer", e);
+        }
     }
 
     @Override
     public void processMsg(String msg) throws Exception {
-        // Call the Controller once you get the message from DMAAP
+        System.out.println("sysout" + msg);
         // Call the concurrent Task executor to handle the incoming request
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public boolean isReady() {
-        // TODO Auto-generated method stub
-        return false;
+        // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
+        // through REST CLIENT
+        // Call the Controller with the extracted payload
     }
 
     @Override
-    public boolean isRunning() {
-        // TODO Auto-generated method stub
-        return false;
+    public boolean isAlive() {
+        return alive;
     }
 
     @Override
     public void stopConsumer() {
-        // TODO Auto-generated method stub
-
+        alive = false;
     }
-
 }