public void onJobRemoved(Job job) {
removeJob(job);
}
-
});
}
topicConsumer.start();
}
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
- subscription.start(topicConsumer.getOutput());
+ subscription.start(topicConsumer.getOutput().asFlux());
consumers.put(job.getType().getId(), job.getId(), subscription);
}
}
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningTasks() {
- this.consumers.keySet().forEach(typeId -> {
- this.consumers.get(typeId).forEach(consumer -> {
+ public synchronized void restartNonRunningTopics() {
+ for (String typeId : this.consumers.keySet()) {
+ for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
if (!consumer.isRunning()) {
restartTopic(consumer);
}
- });
- });
+ }
+ }
}
private void restartTopic(KafkaJobDataConsumer consumer) {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.get(type.getId()).forEach((consumer) -> {
- consumer.start(topic.getOutput());
- });
+ this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}