Process message with Producer logic
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerImpl.java
index 191a13b..2ae5e5e 100644 (file)
@@ -22,7 +22,7 @@ package org.oransc.policyagent.dmaap;
 
 import java.io.IOException;
 import java.util.Properties;
-
+import javax.annotation.PostConstruct;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -44,21 +44,24 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRConsumerResponse response = null;
+    @Autowired
+    private DmaapMessageHandler dmaapMessageHandler;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
         this.applicationConfig = applicationConfig;
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
+    @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
     @Override
     public void run() {
-        if (!alive) {
-            init();
-        }
+        /*
+         * if (!alive) { init(); }
+         */
         if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
+                logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
                 for (String msg : dmaapMsgs) {
                     processMsg(msg);
                 }
@@ -76,23 +79,25 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
             if (!"200".equals(response.getResponseCode())) {
                 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
-                    response.getResponseMessage());
+                        response.getResponseMessage());
             }
         }
         return response.getActualMessages();
     }
 
+    @PostConstruct
     @Override
     public void init() {
         Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
         // No need to start if there is no configuration.
         if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
-            || dmaapPublisherProperties.size() == 0) {
+                || dmaapPublisherProperties.size() == 0) {
+            logger.error("DMaaP properties Failed to Load");
             return;
         }
-        // Do we need to do any validation of properties before calling the factory?
         try {
+            logger.debug("Creating DMAAP Client");
             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
@@ -102,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
     @Override
     public void processMsg(String msg) throws Exception {
-        System.out.println("sysout" + msg);
+        logger.debug("Message Reveived from DMAAP : {}", msg);
         // Call the concurrent Task executor to handle the incoming request
-        // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
-        // through REST CLIENT
-        // Call the Controller with the extracted payload
+        dmaapMessageHandler.handleDmaapMsg(msg);
     }
 
     @Override