From 777b07b0c5ee62ebee9526e634bee7ae3f82640c Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Thu, 23 Jan 2020 16:27:44 +0100 Subject: [PATCH] Add DMaaP publisher configuration to DmaapClient Change-Id: I59f366bb5045e344e026f8d51e8f8b611b631d73 Signed-off-by: elinuxhenrik --- policy-agent/pom.xml | 11 +++ .../configuration/ApplicationConfig.java | 17 ++++- .../configuration/ApplicationConfigParser.java | 85 +++++++++++++++++++++- .../oransc/policyagent/dmaap/BusTopicParams.java | 17 ----- .../dmaap/DmaapMessageConsumerImpl.java | 14 +++- .../policyagent/tasks/RefreshConfigTask.java | 4 +- .../resources/test_application_configuration.json | 8 ++ 7 files changed, 129 insertions(+), 27 deletions(-) delete mode 100644 policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml index 52c69d55..52761cc4 100644 --- a/policy-agent/pom.xml +++ b/policy-agent/pom.xml @@ -41,6 +41,7 @@ 1.1.6 2.0.0 20180130 + 3.3 4.0.1 3.8.0 2.8.1 @@ -101,6 +102,11 @@ json ${json.version} + + commons-net + commons-net + ${commons-net.version} + org.springframework.boot @@ -115,6 +121,11 @@ org.onap.dcaegen2.services.sdk.rest.services cbs-client + ${sdk.version} + + + org.onap.dcaegen2.services.sdk.rest.services + dmaap-client ${sdk.version} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index d4f72610..673fe1d0 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -23,6 +23,7 @@ package org.oransc.policyagent.configuration; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.Vector; import javax.validation.constraints.NotEmpty; @@ -41,6 +42,7 @@ public class ApplicationConfig { private Collection observers = new Vector<>(); private Map ricConfigs = new HashMap<>(); + private Properties dmaapConsumerConfig; @Autowired public ApplicationConfig() { @@ -50,6 +52,13 @@ public class ApplicationConfig { return this.filepath; } + /* + * Do not remove, used by framework! + */ + public synchronized void setFilepath(String filepath) { + this.filepath = filepath; + } + public synchronized Collection getRicConfigs() { return this.ricConfigs.values(); } @@ -63,6 +72,10 @@ public class ApplicationConfig { throw new ServiceException("Could not find ric: " + ricName); } + public Properties getDmaapConsumerConfig() { + return dmaapConsumerConfig; + } + public static enum RicConfigUpdate { ADDED, CHANGED, REMOVED } @@ -85,7 +98,7 @@ public class ApplicationConfig { } } - public void setConfiguration(@NotNull Collection ricConfigs) { + public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapConsumerConfig) { Collection notifications = new Vector<>(); synchronized (this) { Map newRicConfigs = new HashMap<>(); @@ -109,6 +122,8 @@ public class ApplicationConfig { this.ricConfigs = newRicConfigs; } notifyObservers(notifications); + + this.dmaapConsumerConfig = dmaapConsumerConfig; } private void notifyObservers(Collection notifications) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index 75ee1356..9dc41f22 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -25,32 +25,44 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; - +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; import java.util.Vector; - +import javax.validation.constraints.NotNull; import org.oransc.policyagent.exceptions.ServiceException; public class ApplicationConfigParser { private static final String CONFIG = "config"; + private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // private Vector ricConfig; + private Properties dmaapConsumerConfig; public ApplicationConfigParser() { } public void parse(JsonObject root) throws ServiceException { - JsonObject config = root.getAsJsonObject(CONFIG); - ricConfig = parseRics(config); + JsonObject ricConfigJson = root.getAsJsonObject(CONFIG); + ricConfig = parseRics(ricConfigJson); + JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes"); + dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson); } public Vector getRicConfigs() { return this.ricConfig; } + public Properties getDmaapConsumerConfig() { + return dmaapConsumerConfig; + } + private Vector parseRics(JsonObject config) throws ServiceException { Vector result = new Vector(); for (JsonElement ricElem : getAsJsonArray(config, "ric")) { @@ -71,4 +83,69 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsJsonArray(); } + private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException { + Set> topics = consumerCfg.entrySet(); + if (topics.size() != 1) { + throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics); + } + JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); + JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); + String topicUrl = getAsString(dmaapInfo, "topic_url"); + + Properties dmaapProps = new Properties(); + try { + URL url = new URL(topicUrl); + String passwd = ""; + String userName = ""; + if (url.getUserInfo() != null) { + String[] userInfo = url.getUserInfo().split(":"); + userName = userInfo[0]; + passwd = userInfo[1]; + } + String urlPath = url.getPath(); + DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + + dmaapProps.put("port", url.getPort()); + dmaapProps.put("server", url.getHost()); + dmaapProps.put("topic", path.dmaapTopicName); + dmaapProps.put("consumerGroup", path.consumerGroup); + dmaapProps.put("consumerInstance", path.consumerId); + dmaapProps.put("fetchTimeout", 15000); + dmaapProps.put("fetchLimit", 1000); + dmaapProps.put("userName", userName); + dmaapProps.put("password", passwd); + } catch (MalformedURLException e) { + throw new ServiceException("Could not parse the URL", e); + } + + return dmaapProps; + } + + private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException { + return get(obj, memberName).getAsString(); + } + + private class DmaapConsumerUrlPath { + final String dmaapTopicName; + final String consumerGroup; + final String consumerId; + + DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + this.dmaapTopicName = dmaapTopicName; + this.consumerGroup = consumerGroup; + this.consumerId = consumerId; + } + } + + private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { + String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1 + if (tokens.length != 5) { + throw new ServiceException("The path has incorrect syntax: " + urlPath); + } + + final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P + final String consumerGroup = tokens[3]; // users + final String consumerId = tokens[4]; // sdnc1 + return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java deleted file mode 100644 index 47ab5924..00000000 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.oransc.policyagent.dmaap; - -import org.springframework.context.annotation.Configuration; - -@Configuration("dmaap") -public class BusTopicParams { - - private int port; - private String server; - private String topic; - private String consumerGroup; - private String consumerInstance; - private int fetchTimeout; - private int fetchLimit; - private String userName; - private String password; -} \ No newline at end of file diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index b64a8224..30444e20 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -3,13 +3,20 @@ package org.oransc.policyagent.dmaap; import java.util.Properties; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; +import org.oransc.policyagent.configuration.ApplicationConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +@Component public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { + private final ApplicationConfig applicationConfig; + protected MRConsumerImpl consumer; - - public DmaapMessageConsumerImpl() { - // TODO Auto-generated constructor stub + + @Autowired + public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; } @Override @@ -20,6 +27,7 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { @Override public void init(Properties baseProperties) { + Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig(); // Initialize the DMAAP with the properties // TODO Auto-generated method stub diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 9a8dc348..bc43edab 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -125,7 +125,7 @@ public class RefreshConfigTask { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); - this.appConfig.setConfiguration(parser.getRicConfigs()); + this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig()); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -152,7 +152,7 @@ public class RefreshConfigTask { } ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - appConfig.setConfiguration(appParser.getRicConfigs()); + appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig()); logger.info("Local configuration file loaded: {}", filepath); } catch (JsonSyntaxException | ServiceException | IOException e) { logger.trace("Local configuration file not loaded: {}", filepath, e); diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json index 446c0611..c63b7104 100644 --- a/policy-agent/src/test/resources/test_application_configuration.json +++ b/policy-agent/src/test/resources/test_application_configuration.json @@ -19,5 +19,13 @@ ] } ] + }, + "streams_subscribes": { + "dmaap_subscriber": { + "dmaap_info": { + "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1" + }, + "type": "message_router" + } } } \ No newline at end of file -- 2.16.6