+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
package org.oransc.policyagent.dmaap;
+import java.io.IOException;
import java.util.Properties;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
+@EnableScheduling
public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
- private final ApplicationConfig applicationConfig;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
- protected MRConsumerImpl consumer;
+ private boolean alive = false;
+ private final ApplicationConfig applicationConfig;
+ protected MRConsumer consumer;
+ private MRConsumerResponse response = null;
@Autowired
public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
}
+ @Scheduled(fixedRate = 1000 * 60)
@Override
public void run() {
- // TODO Auto-generated method stub
+ if (!alive) {
+ init();
+ }
+ if (this.alive) {
+ try {
+ Iterable<String> dmaapMsgs = fetchAllMessages();
+ for (String msg : dmaapMsgs) {
+ processMsg(msg);
+ }
+ } catch (Exception e) {
+ logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
+ }
+ }
+ }
+ private Iterable<String> fetchAllMessages() {
+ response = consumer.fetchWithReturnConsumerResponse();
+ if (response == null) {
+ logger.warn("{}: DMaaP NULL response received", this);
+ } else {
+ logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
+ if (!"200".equals(response.getResponseCode())) {
+ logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+ }
+ }
+ return response.getActualMessages();
}
@Override
- public void init(Properties baseProperties) {
- Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
- // Initialize the DMAAP with the properties
- // TODO Auto-generated method stub
-
+ public void init() {
+ Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+ Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+ // No need to start if there is no configuration.
+ if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+ || dmaapPublisherProperties.size() == 0) {
+ return;
+ }
+ // Do we need to do any validation of properties before calling the factory?
+ try {
+ consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+ this.alive = true;
+ } catch (IOException e) {
+ logger.error("Exception occurred while creating Dmaap Consumer", e);
+ }
}
@Override
public void processMsg(String msg) throws Exception {
- // Call the Controller once you get the message from DMAAP
+ System.out.println("sysout" + msg);
// Call the concurrent Task executor to handle the incoming request
- // TODO Auto-generated method stub
-
+ // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
+ // through REST CLIENT
+ // Call the Controller with the extracted payload
}
@Override
- public boolean isReady() {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean isRunning() {
- // TODO Auto-generated method stub
- return false;
+ public boolean isAlive() {
+ return alive;
}
@Override
public void stopConsumer() {
- // TODO Auto-generated method stub
-
+ alive = false;
}
-
}