- private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
- Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
- MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
- MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
- if (response == null || !"200".equals(response.getResponseCode())) {
- String errorMessage = "DMaaP NULL response received";
- if (response != null) {
- errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage()
- + " from DMaaP.";
+ protected boolean isStopped() {
+ return false;
+ }
+
+ protected boolean isDmaapConfigured() {
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty());
+ }
+
+ private static List<String> parseMessages(String jsonString) {
+ JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
+ List<String> result = new ArrayList<>();
+ for (JsonElement element : arrayOfMessages) {
+ if (element.isJsonPrimitive()) {
+ result.add(element.getAsString());
+ } else {
+ String messageAsString = element.toString();
+ result.add(messageAsString);