+ @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
+ public synchronized void restartNonRunningTasks() {
+
+ for (KafkaJobDataConsumer consumer : consumers.values()) {
+ if (!consumer.isRunning()) {
+ restartTopic(consumer);
+ }
+ }
+ }
+
+ private void restartTopic(KafkaJobDataConsumer consumer) {
+ InfoType type = consumer.getJob().getType();
+ KafkaTopicListener topic = this.topicListeners.get(type.getId());
+ topic.start();
+ restartConsumersOfType(topic, type);
+ }
+
+ private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+ this.consumers.forEach((jobId, consumer) -> {
+ if (consumer.getJob().getType().getId().equals(type.getId())) {
+ consumer.start(topic.getOutput());
+ }
+ });
+ }