X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageConsumer.java;h=011b9779bb5d626d4e0550c1e7e4b3f946b78ecb;hb=5056cc12ba520568fcfd28132d815cf3d2afc8b0;hp=9165af5434e172af6db01a97c64e5c78f8bf802f;hpb=083393d0affc7dca6a5cea89f4f9759801a91591;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index 9165af54..011b9779 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -33,7 +33,6 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; -import org.oransc.policyagent.tasks.RefreshConfigTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -65,8 +64,8 @@ public class DmaapMessageConsumer { private DmaapMessageHandler dmaapMessageHandler = null; private MRConsumer messageRouterConsumer = null; - @Value("${server.port}") - private int localServerPort; + @Value("${server.http-port}") + private int localServerHttpPort; @Autowired public DmaapMessageConsumer(ApplicationConfig applicationConfig) { @@ -74,36 +73,30 @@ public class DmaapMessageConsumer { } /** - * Starts the consumer. If there is a DMaaP configuration, it will start polling for messages. Otherwise it will - * check regularly for the configuration. + * Starts the consumer. If there is a DMaaP configuration, it will start polling + * for messages. Otherwise it will check regularly for the configuration. * * @return the running thread, for test purposes. */ public Thread start() { - Thread thread = new Thread(this::checkConfigLoop); + Thread thread = new Thread(this::messageHandlingLoop); thread.start(); return thread; } - private void checkConfigLoop() { - while (!isStopped()) { - if (isDmaapConfigured()) { - messageHandlingLoop(); - } else { - sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); - } - } - } - private void messageHandlingLoop() { - while (!isStopped() && isDmaapConfigured()) { + while (!isStopped()) { try { - Iterable dmaapMsgs = fetchAllMessages(); - if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { - logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); - for (String msg : dmaapMsgs) { - processMsg(msg); + if (isDmaapConfigured()) { + Iterable dmaapMsgs = fetchAllMessages(); + if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); + for (String msg : dmaapMsgs) { + processMsg(msg); + } } + } else { + sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration } } catch (Exception e) { logger.warn("Cannot fetch because of {}", e.getMessage()); @@ -146,7 +139,7 @@ public class DmaapMessageConsumer { protected DmaapMessageHandler getDmaapMessageHandler() throws IOException { if (this.dmaapMessageHandler == null) { - String agentBaseUrl = "https://localhost:" + this.localServerPort; + String agentBaseUrl = "http://localhost:" + this.localServerHttpPort; AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl); Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);