public class KafkaTopicConsumers {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
- private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+ private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+
@Getter
- private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+ private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
}
public synchronized void addJob(Job job) {
- if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
logger.debug("Kafka job added {}", job.getId());
KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
- KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
- subscription.start();
- activeSubscriptions.put(job.getId(), subscription);
+ KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
+ subscription.start(topicConsumer.getOutput());
+ consumers.put(job.getId(), subscription);
}
}
public synchronized void removeJob(Job job) {
- KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+ KafkaJobDataConsumer d = consumers.remove(job.getId());
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
d.stop();
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
public synchronized void restartNonRunningTasks() {
- for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+
+ for (KafkaJobDataConsumer consumer : consumers.values()) {
if (!consumer.isRunning()) {
- consumer.start();
+ restartTopic(consumer);
}
}
}
+ private void restartTopic(KafkaJobDataConsumer consumer) {
+ InfoType type = consumer.getJob().getType();
+ KafkaTopicListener topic = this.topicListeners.get(type.getId());
+ topic.start();
+ restartConsumersOfType(topic, type);
+ }
+
+ private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+ this.consumers.forEach((jobId, consumer) -> {
+ if (consumer.getJob().getType().getId().equals(type.getId())) {
+ consumer.start(topic.getOutput());
+ }
+ });
+ }
}