public synchronized void restartNonRunningTopics() {
for (String typeId : this.consumers.keySet()) {
for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
- restartTopic(consumer);
+ if (!consumer.isRunning()) {
+ restartTopic(consumer);
+ }
}
}
}