- private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
- Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
- if (topics.size() != 1) {
- throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
- }
- JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
- String topicUrl = getAsString(dmaapInfo, "topic_url");
-
- Properties dmaapProps = new Properties();
- try {
- URL url = new URL(topicUrl);
- String passwd = "";
- String userName = "";
- if (url.getUserInfo() != null) {
- String[] userInfo = url.getUserInfo().split(":");
- userName = userInfo[0];
- passwd = userInfo[1];
- }
- String urlPath = url.getPath();
- DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
-
- dmaapProps.put("port", url.getPort());
- dmaapProps.put("server", url.getHost());
- dmaapProps.put("topic", path.dmaapTopicName);
- dmaapProps.put("consumerGroup", path.consumerGroup);
- dmaapProps.put("consumerInstance", path.consumerId);
- dmaapProps.put("fetchTimeout", 15000);
- dmaapProps.put("fetchLimit", 1000);
- dmaapProps.put("userName", userName);
- dmaapProps.put("password", passwd);
- } catch (MalformedURLException e) {
- throw new ServiceException("Could not parse the URL", e);
+ private String parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
+ Set<Entry<String, JsonElement>> streamConfigEntries = streamCfg.entrySet();
+ if (streamConfigEntries.size() != 1) {
+ throw new ServiceException(
+ "Invalid configuration. Number of streams must be one, config: " + streamConfigEntries);