- private Iterable<String> fetchAllMessages() throws ServiceException, FileNotFoundException, IOException {
- Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
- MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
- MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
- if (response == null || !"200".equals(response.getResponseCode())) {
- throw new ServiceException("DMaaP NULL response received");
+ protected boolean isStopped() {
+ return false;
+ }
+
+ protected boolean isDmaapConfigured() {
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty());
+ }
+
+ private static List<String> parseMessages(String jsonString) {
+ JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
+ List<String> result = new ArrayList<>();
+ for (JsonElement element : arrayOfMessages) {
+ if (element.isJsonPrimitive()) {
+ result.add(element.getAsString());
+ } else {
+ String messageAsString = element.toString();
+ result.add(messageAsString);
+ }
+ }
+ return result;
+ }
+
+ protected Iterable<String> fetchAllMessages() throws ServiceException {
+ String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
+ AsyncRestClient consumer = getMessageRouterConsumer();
+ ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
+ logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
+ if (response.getStatusCode().is2xxSuccessful()) {
+ return parseMessages(response.getBody());