Removed the DMAAP Accepted/Rejected Call
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerImpl.java
index 74cfe0d..503ddab 100644 (file)
@@ -40,21 +40,27 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
 
     private boolean alive = false;
+    private final ApplicationConfig applicationConfig;
     protected MRConsumer consumer;
     private MRConsumerResponse response = null;
+    @Autowired
+    private DmaapMessageHandler dmaapMessageHandler;
 
     @Autowired
     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
-        init(dmaapConsumerConfig);
+        this.applicationConfig = applicationConfig;
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
+    @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
     @Override
     public void run() {
-        while (this.alive) {
+        if (!alive) {
+            init();
+        }
+        if (this.alive) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
+                logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
                 for (String msg : dmaapMsgs) {
                     processMsg(msg);
                 }
@@ -78,24 +84,23 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
         return response.getActualMessages();
     }
 
+    // Properties are not loaded in first atempt. Need to fix this and then uncomment the post construct annotation
+    // @PostConstruct
     @Override
-    public void init(Properties properties) {
-        // Initialize the DMAAP with the properties
-        // Do we need to do any validation of properties before calling the factory?
-        Properties prop = new Properties();
-        prop.setProperty("ServiceName", "localhost:6845/events");
-        prop.setProperty("topic", "A1-P");
-        prop.setProperty("host", "localhost:6845");
-        prop.setProperty("contenttype", "application/json");
-        prop.setProperty("username", "admin");
-        prop.setProperty("password", "admin");
-        prop.setProperty("group", "users");
-        prop.setProperty("id", "policy-agent");
-        prop.setProperty("TransportType", "HTTPNOAUTH");
-        prop.setProperty("timeout", "15000");
-        prop.setProperty("limit", "1000");
+    public void init() {
+        Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+        // No need to start if there is no configuration.
+        if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+                || dmaapPublisherProperties.size() == 0) {
+            logger.error("DMaaP properties Failed to Load");
+            return;
+        }
         try {
-            consumer = MRClientFactory.createConsumer(prop);
+            logger.debug("Creating DMAAP Client");
+            System.out.println("dmaapConsumerProperties--->"+dmaapConsumerProperties.getProperty("topic"));
+            System.out.println("dmaapPublisherProperties--->"+dmaapPublisherProperties.getProperty("topic"));
+            consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
             this.alive = true;
         } catch (IOException e) {
             logger.error("Exception occurred while creating Dmaap Consumer", e);
@@ -104,11 +109,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
     @Override
     public void processMsg(String msg) throws Exception {
-        System.out.println("sysout" + msg);
+        logger.debug("Message Reveived from DMAAP : {}", msg);
         // Call the concurrent Task executor to handle the incoming request
-        // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
-        // through REST CLIENT
-        // Call the Controller with the extracted payload
+        dmaapMessageHandler.handleDmaapMsg(msg);
     }
 
     @Override