From 7ff95e760df22a5b6e6a77b8f7b906ab0e2a55b6 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Wed, 29 Jan 2020 10:54:37 +0100 Subject: [PATCH] Fix small things in DMaaP listener area Change-Id: I3aa61203e3c18953f0761590ca4957424f6844f1 Signed-off-by: elinuxhenrik --- .../policyagent/configuration/ApplicationConfigParser.java | 13 +++++++------ .../oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java | 6 ++---- .../org/oransc/policyagent/dmaap/DmaapMessageHandler.java | 13 +++---------- .../java/org/oransc/policyagent/tasks/StartupService.java | 2 +- 4 files changed, 13 insertions(+), 21 deletions(-) diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index 1352cb51..ae83bc05 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -91,13 +91,14 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsJsonArray(); } - private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException { - Set> topics = consumerCfg.entrySet(); - if (topics.size() != 1) { - throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics); + private Properties parseDmaapConfig(JsonObject streamCfg) throws ServiceException { + Set> streamConfigEntries = streamCfg.entrySet(); + if (streamConfigEntries.size() != 1) { + throw new ServiceException( + "Invalid configuration. Number of streams must be one, config: " + streamConfigEntries); } - JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); - JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); + JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject(); + JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject(); String topicUrl = getAsString(dmaapInfo, "topic_url"); Properties dmaapProps = new Properties(); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 503ddabe..c7dab584 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -84,8 +84,6 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { return response.getActualMessages(); } - // Properties are not loaded in first atempt. Need to fix this and then uncomment the post construct annotation - // @PostConstruct @Override public void init() { Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); @@ -98,8 +96,8 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { } try { logger.debug("Creating DMAAP Client"); - System.out.println("dmaapConsumerProperties--->"+dmaapConsumerProperties.getProperty("topic")); - System.out.println("dmaapPublisherProperties--->"+dmaapPublisherProperties.getProperty("topic")); + logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic")); + logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic")); consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index 713d4834..172fe989 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -50,17 +50,14 @@ public class DmaapMessageHandler { @Autowired private PolicyController policyController; private AsyncRestClient restClient; + @Autowired private ApplicationConfig applicationConfig; private String topic = ""; - @Autowired - public DmaapMessageHandler(ApplicationConfig applicationConfig) { - this.applicationConfig = applicationConfig; - } - // The publish properties is corrupted. It contains the subscribe property values. @Async("threadPoolTaskExecutor") public void handleDmaapMsg(String msg) { + logger.debug("Message ---------->{}", msg); init(); DmaapRequestMessage dmaapRequestMessage = null; Optional dmaapResponse = null; @@ -71,7 +68,7 @@ public class DmaapMessageHandler { * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", * "payload": "{\"ricName\":\"ric1\"}" } * - * -------------------------------------------------------------------------------------------------------------- + * ------------------------------------------------------------------------------------------------------------- * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201, * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]} * ------------------------------------------------------------------------------------------------------------- @@ -162,15 +159,11 @@ public class DmaapMessageHandler { return Optional.empty(); } - // @PostConstruct - // The application properties value is always NULL for the first time - // Need to fix this public void init() { logger.debug("Reading DMAAP Publisher bus details from Application Config"); Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); String host = (String) dmaapPublisherConfig.get("ServiceName"); topic = dmaapPublisherConfig.getProperty("topic"); - System.out.println("\"Read the topic ---------->" + topic); logger.debug("Read the topic & Service Name - {} , {}", host, topic); this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java index c0ffcbdf..54fd79f4 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java @@ -65,7 +65,7 @@ public class StartupService implements ApplicationConfig.Observer { @Autowired private Services services; - // Only for unittesting + // Only for unit testing StartupService(ApplicationConfig appConfig, RefreshConfigTask refreshTask, Rics rics, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory, Policies policies, Services services) { this.applicationConfig = appConfig; -- 2.16.6