+ private Properties parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
+ Set<Entry<String, JsonElement>> streamConfigEntries = streamCfg.entrySet();
+ if (streamConfigEntries.size() != 1) {
+ throw new ServiceException(
+ "Invalid configuration. Number of streams must be one, config: " + streamConfigEntries);
+ }
+ JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ try {
+ Properties dmaapProps = new Properties();
+ URL url = new URL(topicUrl);
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapUrlPath path = parseDmaapUrlPath(urlPath);
+
+ dmaapProps.put("ServiceName", url.getHost() + ":" + url.getPort() + "/events");
+ dmaapProps.put("topic", path.dmaapTopicName);
+ dmaapProps.put("host", url.getHost() + ":" + url.getPort());
+ dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
+ dmaapProps.put("userName", userName);
+ dmaapProps.put("password", passwd);
+ dmaapProps.put("group", path.consumerGroup);
+ dmaapProps.put("id", path.consumerId);
+ dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
+ dmaapProps.put("timeout", 15000);
+ dmaapProps.put("limit", 100);
+ dmaapProps.put("maxBatchSize", "10");
+ dmaapProps.put("maxAgeMs", "10000");
+ dmaapProps.put("compress", true);
+ dmaapProps.put("MessageSentThreadOccurance", "2");
+ return dmaapProps;
+ } catch (MalformedURLException e) {
+ throw new ServiceException("Could not parse the URL", e);
+ }