Add publisher configuration and fix issues
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / configuration / ApplicationConfigParser.java
index 530ac98..d32db7a 100644 (file)
@@ -35,7 +35,9 @@ import java.util.Vector;
 
 import javax.validation.constraints.NotNull;
 
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.springframework.http.MediaType;
 
 public class ApplicationConfigParser {
 
@@ -46,6 +48,7 @@ public class ApplicationConfigParser {
         .create(); //
 
     private Vector<RicConfig> ricConfig;
+    private Properties dmaapPublisherConfig;
     private Properties dmaapConsumerConfig;
 
     public ApplicationConfigParser() {
@@ -54,14 +57,28 @@ public class ApplicationConfigParser {
     public void parse(JsonObject root) throws ServiceException {
         JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
         ricConfig = parseRics(ricConfigJson);
-        JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
-        dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
+        JsonObject dmaapPublisherConfigJson = root.getAsJsonObject("streams_publishes");
+        if (dmaapPublisherConfigJson == null) {
+            dmaapPublisherConfig = new Properties();
+        } else {
+            dmaapPublisherConfig = parseDmaapConfig(dmaapPublisherConfigJson);
+        }
+        JsonObject dmaapConsumerConfigJson = root.getAsJsonObject("streams_subscribes");
+        if (dmaapConsumerConfigJson == null) {
+            dmaapConsumerConfig = new Properties();
+        } else {
+            dmaapConsumerConfig = parseDmaapConfig(dmaapConsumerConfigJson);
+        }
     }
 
     public Vector<RicConfig> getRicConfigs() {
         return this.ricConfig;
     }
 
+    public Properties getDmaapPublisherConfig() {
+        return dmaapPublisherConfig;
+    }
+
     public Properties getDmaapConsumerConfig() {
         return dmaapConsumerConfig;
     }
@@ -86,7 +103,7 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsJsonArray();
     }
 
-    private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+    private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException {
         Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
         if (topics.size() != 1) {
             throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
@@ -106,17 +123,20 @@ public class ApplicationConfigParser {
                 passwd = userInfo[1];
             }
             String urlPath = url.getPath();
-            DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+            DmaapUrlPath path = parseDmaapUrlPath(urlPath);
 
-            dmaapProps.put("port", url.getPort());
-            dmaapProps.put("server", url.getHost());
+            dmaapProps.put("ServiceName", url.getHost());
             dmaapProps.put("topic", path.dmaapTopicName);
-            dmaapProps.put("consumerGroup", path.consumerGroup);
-            dmaapProps.put("consumerInstance", path.consumerId);
-            dmaapProps.put("fetchTimeout", 15000);
-            dmaapProps.put("fetchLimit", 1000);
+            dmaapProps.put("host", url.getHost());
+            dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
             dmaapProps.put("userName", userName);
             dmaapProps.put("password", passwd);
+            dmaapProps.put("group", path.consumerGroup);
+            dmaapProps.put("id", path.consumerId);
+            dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
+            dmaapProps.put("timeout", 15000);
+            dmaapProps.put("limit", 1000);
+            dmaapProps.put("port", url.getPort());
         } catch (MalformedURLException e) {
             throw new ServiceException("Could not parse the URL", e);
         }
@@ -128,27 +148,31 @@ public class ApplicationConfigParser {
         return get(obj, memberName).getAsString();
     }
 
-    private class DmaapConsumerUrlPath {
+    private class DmaapUrlPath {
         final String dmaapTopicName;
         final String consumerGroup;
         final String consumerId;
 
-        DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+        DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
             this.dmaapTopicName = dmaapTopicName;
             this.consumerGroup = consumerGroup;
             this.consumerId = consumerId;
         }
     }
 
-    private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+    private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
         String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
-        if (tokens.length != 5) {
+        if (!(tokens.length == 3 ^ tokens.length == 5)) {
             throw new ServiceException("The path has incorrect syntax: " + urlPath);
         }
 
         final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
-        final String consumerGroup = tokens[3]; // users
-        final String consumerId = tokens[4]; // sdnc1
-        return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+        String consumerGroup = ""; // users
+        String consumerId = ""; // sdnc1
+        if (tokens.length == 5) {
+            consumerGroup = tokens[3];
+            consumerId = tokens[4];
+        }
+        return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
     }
 }