+ private void restartTopic(KafkaJobDataConsumer consumer) {
+ InfoType type = consumer.getJob().getType();
+ KafkaTopicListener topic = this.topicListeners.get(type.getId());
+ topic.start();
+ restartConsumersOfType(topic, type);
+ }
+
+ private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+ this.consumers.forEach((jobId, consumer) -> {
+ if (consumer.getJob().getType().getId().equals(type.getId())) {
+ consumer.start(topic.getOutput());
+ }
+ });
+ }