- logger.warn("{}: cannot fetch because of ", this, e.getMessage(), e);
- sleep(TIME_BETWEEN_DMAAP_POLLS);
+ logger.warn("{}", e.getMessage());
+ sleep(TIME_BETWEEN_DMAAP_RETRIES);
+ }
+ }
+ }
+
+ protected boolean isStopped() {
+ return false;
+ }
+
+ protected boolean isDmaapConfigured() {
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty());
+ }
+
+ private static List<String> parseMessages(String jsonString) {
+ JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
+ List<String> result = new ArrayList<>();
+ for (JsonElement element : arrayOfMessages) {
+ if (element.isJsonPrimitive()) {
+ result.add(element.getAsString());
+ } else {
+ String messageAsString = element.toString();
+ result.add(messageAsString);