X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerImpl.java;h=2ae5e5ee6728438556d0e677414d0a12757ca616;hb=d1b7bb42fe20c8c0131959f3e96668d8e6669d04;hp=191a13b22216c9c0ae66cc50ee1f73de566ed4df;hpb=c3ed1a5199e3f5f539cc813b9a383dd156fed2e2;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 191a13b2..2ae5e5ee 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -22,7 +22,7 @@ package org.oransc.policyagent.dmaap; import java.io.IOException; import java.util.Properties; - +import javax.annotation.PostConstruct; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -44,21 +44,24 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { private final ApplicationConfig applicationConfig; protected MRConsumer consumer; private MRConsumerResponse response = null; + @Autowired + private DmaapMessageHandler dmaapMessageHandler; @Autowired public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; } - @Scheduled(fixedRate = 1000 * 60) + @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000) @Override public void run() { - if (!alive) { - init(); - } + /* + * if (!alive) { init(); } + */ if (this.alive) { try { Iterable dmaapMsgs = fetchAllMessages(); + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); for (String msg : dmaapMsgs) { processMsg(msg); } @@ -76,23 +79,25 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage()); if (!"200".equals(response.getResponseCode())) { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); } } return response.getActualMessages(); } + @PostConstruct @Override public void init() { Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); // No need to start if there is no configuration. if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0 - || dmaapPublisherProperties.size() == 0) { + || dmaapPublisherProperties.size() == 0) { + logger.error("DMaaP properties Failed to Load"); return; } - // Do we need to do any validation of properties before calling the factory? try { + logger.debug("Creating DMAAP Client"); consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { @@ -102,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { @Override public void processMsg(String msg) throws Exception { - System.out.println("sysout" + msg); + logger.debug("Message Reveived from DMAAP : {}", msg); // Call the concurrent Task executor to handle the incoming request - // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP - // through REST CLIENT - // Call the Controller with the extracted payload + dmaapMessageHandler.handleDmaapMsg(msg); } @Override