Merge "Adapt A1 controller to latest A1 spec"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerImpl.java
index 30444e2..2ae5e5e 100644 (file)
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
 package org.oransc.policyagent.dmaap;
 
+import java.io.IOException;
 import java.util.Properties;
-
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import javax.annotation.PostConstruct;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 @Component
+@EnableScheduling
 public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
-    private final ApplicationConfig applicationConfig;
-
-       protected MRConsumerImpl consumer;
-
-       @Autowired
-       public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-               this.applicationConfig = applicationConfig;
-       }
-
-       @Override
-       public void run() {
-               // TODO Auto-generated method stub
-
-       }
-
-       @Override
-       public void init(Properties baseProperties) {
-           Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
-               // Initialize the DMAAP with the properties
-               // TODO Auto-generated method stub
-
-       }
-
-       @Override
-       public void processMsg(String msg) throws Exception {
-               // Call the Controller once you get the message from DMAAP
-               // Call the concurrent Task executor to handle the incoming request
-               // TODO Auto-generated method stub
-
-       }
-
-       @Override
-       public boolean isReady() {
-               // TODO Auto-generated method stub
-               return false;
-       }
-
-       @Override
-       public boolean isRunning() {
-               // TODO Auto-generated method stub
-               return false;
-       }
-
-       @Override
-       public void stopConsumer() {
-               // TODO Auto-generated method stub
-
-       }
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
 
+    private boolean alive = false;
+    private final ApplicationConfig applicationConfig;
+    protected MRConsumer consumer;
+    private MRConsumerResponse response = null;
+    @Autowired
+    private DmaapMessageHandler dmaapMessageHandler;
+
+    @Autowired
+    public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
+
+    @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
+    @Override
+    public void run() {
+        /*
+         * if (!alive) { init(); }
+         */
+        if (this.alive) {
+            try {
+                Iterable<String> dmaapMsgs = fetchAllMessages();
+                logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+                for (String msg : dmaapMsgs) {
+                    processMsg(msg);
+                }
+            } catch (Exception e) {
+                logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
+            }
+        }
+    }
+
+    private Iterable<String> fetchAllMessages() {
+        response = consumer.fetchWithReturnConsumerResponse();
+        if (response == null) {
+            logger.warn("{}: DMaaP NULL response received", this);
+        } else {
+            logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
+            if (!"200".equals(response.getResponseCode())) {
+                logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+                        response.getResponseMessage());
+            }
+        }
+        return response.getActualMessages();
+    }
+
+    @PostConstruct
+    @Override
+    public void init() {
+        Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+        // No need to start if there is no configuration.
+        if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+                || dmaapPublisherProperties.size() == 0) {
+            logger.error("DMaaP properties Failed to Load");
+            return;
+        }
+        try {
+            logger.debug("Creating DMAAP Client");
+            consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+            this.alive = true;
+        } catch (IOException e) {
+            logger.error("Exception occurred while creating Dmaap Consumer", e);
+        }
+    }
+
+    @Override
+    public void processMsg(String msg) throws Exception {
+        logger.debug("Message Reveived from DMAAP : {}", msg);
+        // Call the concurrent Task executor to handle the incoming request
+        dmaapMessageHandler.handleDmaapMsg(msg);
+    }
+
+    @Override
+    public boolean isAlive() {
+        return alive;
+    }
+
+    @Override
+    public void stopConsumer() {
+        alive = false;
+    }
 }