X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumerImpl.java;h=bd3ba69f9c43eee75cad948aaea5d26c5b044de4;hb=refs%2Fchanges%2F69%2F2369%2F3;hp=30444e201ef8cf85e12d7c34b1096d3222016667;hpb=777b07b0c5ee62ebee9526e634bee7ae3f82640c;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 30444e20..bd3ba69f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -1,62 +1,137 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + package org.oransc.policyagent.dmaap; +import com.google.common.collect.Iterables; +import java.io.IOException; import java.util.Properties; - -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.onap.dmaap.mr.client.MRClientFactory; +import org.onap.dmaap.mr.client.MRConsumer; +import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component +@EnableScheduling public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { - private final ApplicationConfig applicationConfig; - - protected MRConsumerImpl consumer; - - @Autowired - public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { - this.applicationConfig = applicationConfig; - } - - @Override - public void run() { - // TODO Auto-generated method stub - - } - - @Override - public void init(Properties baseProperties) { - Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig(); - // Initialize the DMAAP with the properties - // TODO Auto-generated method stub - - } - - @Override - public void processMsg(String msg) throws Exception { - // Call the Controller once you get the message from DMAAP - // Call the concurrent Task executor to handle the incoming request - // TODO Auto-generated method stub - - } - - @Override - public boolean isReady() { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean isRunning() { - // TODO Auto-generated method stub - return false; - } - - @Override - public void stopConsumer() { - // TODO Auto-generated method stub - - } + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class); + private boolean alive = false; + private final ApplicationConfig applicationConfig; + protected MRConsumer consumer; + @Autowired + private DmaapMessageHandler dmaapMessageHandler; + private final long FETCHTIMEOUT = 30000; + + private CountDownLatch sleep = new CountDownLatch(1); + + @Autowired + public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + } + + @Scheduled(fixedRate = 1000 * 10) + @Override + public void run() { + if (!alive) { + init(); + } + if (this.alive) { + try { + Iterable dmaapMsgs = fetchAllMessages(); + if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); + for (String msg : dmaapMsgs) { + processMsg(msg); + } + } + } catch (Exception e) { + logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); + } + } + } + + private Iterable fetchAllMessages() { + MRConsumerResponse response = null; + try { + response = consumer.fetchWithReturnConsumerResponse(); + } catch (Exception e) { + logger.error("Failed to get message from DMAAP", e); + } + if (response == null || !"200".equals(response.getResponseCode())) { + logger.warn("{}: DMaaP NULL response received", this); + sleepAfterFailure(); + } else { + logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage()); + } + return response.getActualMessages(); + } + + @Override + public void init() { + Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); + Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); + // No need to start if there is no configuration. + if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0 + || dmaapPublisherProperties.size() == 0) { + logger.error("DMaaP properties Failed to Load"); + return; + } + try { + logger.debug("Creating DMAAP Client"); + logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic")); + logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic")); + consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); + this.alive = true; + } catch (IOException e) { + logger.error("Exception occurred while creating Dmaap Consumer", e); + } + } + + @Override + public void processMsg(String msg) throws Exception { + logger.debug("Message Reveived from DMAAP : {}", msg); + // Call the concurrent Task executor to handle the incoming request + dmaapMessageHandler.handleDmaapMsg(msg); + } + + @Override + public boolean isAlive() { + return alive; + } + + private void sleepAfterFailure() { + logger.warn("DMAAP message Consumer is put to Sleep for {} milliseconds", FETCHTIMEOUT); + try { + sleep.await(FETCHTIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Failed to put the thread to sleep: {}", e); + } + } }