Fixed the Dmaap publisher issue
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerImpl.java
index a6a912d..db4956a 100644 (file)
 package org.oransc.policyagent.dmaap;
 
 import com.google.common.collect.Iterables;
-
 import java.io.IOException;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.MRConsumer;
@@ -51,9 +47,6 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRBatchingPublisher producer;
-    private final long FETCHTIMEOUT = 30000;
-
-    private CountDownLatch sleep = new CountDownLatch(1);
 
     @Value("${server.port}")
     private int localServerPort;
@@ -63,7 +56,7 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         this.applicationConfig = applicationConfig;
     }
 
-    @Scheduled(fixedRate = 1000 * 10)
+    @Scheduled(fixedRate = 1000 * 40)
     @Override
     public void run() {
         if (!alive) {
@@ -93,7 +86,6 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         }
         if (response == null || !"200".equals(response.getResponseCode())) {
             logger.warn("{}: DMaaP NULL response received", this);
-            sleepAfterFailure();
         } else {
             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
         }
@@ -106,7 +98,7 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
         // No need to start if there is no configuration.
         if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
-            || dmaapPublisherProperties.size() == 0) {
+                || dmaapPublisherProperties.size() == 0) {
             logger.error("DMaaP properties Failed to Load");
             return;
         }
@@ -115,7 +107,7 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
             logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic"));
             logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic"));
             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
-            producer = MRClientFactory.createBatchingPublisher(dmaapConsumerProperties);
+            producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
             this.alive = true;
         } catch (IOException e) {
             logger.error("Exception occurred while creating Dmaap Consumer", e);
@@ -138,13 +130,4 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     public boolean isAlive() {
         return alive;
     }
-
-    private void sleepAfterFailure() {
-        logger.warn("DMAAP message Consumer is put to Sleep for {} milliseconds", FETCHTIMEOUT);
-        try {
-            sleep.await(FETCHTIMEOUT, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("Failed to put the thread to sleep: {}", e);
-        }
-    }
 }