X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaTopicConsumers.java;h=29ad8c7518e188b20451611aa7193024c58a3032;hb=242299199382ec3fd7d514dde2eb607086a6a46e;hp=785f98bf41cc14d7a008ab5241221ef60e65a918;hpb=6f48adb69090799c74c29204dd2cd1737cc9d6ac;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 785f98bf..29ad8c75 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -30,6 +30,7 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.MultiMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,9 +44,10 @@ import org.springframework.stereotype.Component; public class KafkaTopicConsumers { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class); - private final Map topicListeners = new HashMap<>(); + private final Map topicListeners = new HashMap<>(); // Key is typeId + @Getter - private final Map activeSubscriptions = new HashMap<>(); + private final MultiMap consumers = new MultiMap<>(); // Key is typeId, jobId private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; @@ -74,17 +76,21 @@ public class KafkaTopicConsumers { } public synchronized void addJob(Job job) { - if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { + if (job.getType().isKafkaTopicDefined()) { + removeJob(job); logger.debug("Kafka job added {}", job.getId()); KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId()); - KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job); - subscription.start(); - activeSubscriptions.put(job.getId(), subscription); + if (consumers.get(job.getType().getId()).isEmpty()) { + topicConsumer.start(); + } + KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); + subscription.start(topicConsumer.getOutput()); + consumers.put(job.getType().getId(), job.getId(), subscription); } } public synchronized void removeJob(Job job) { - KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId()); + KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId()); if (d != null) { logger.debug("Kafka job removed {}", job.getId()); d.stop(); @@ -93,11 +99,25 @@ public class KafkaTopicConsumers { @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) public synchronized void restartNonRunningTasks() { - for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) { - if (!consumer.isRunning()) { - consumer.start(); - } - } + this.consumers.keySet().forEach(typeId -> { + this.consumers.get(typeId).forEach(consumer -> { + if (!consumer.isRunning()) { + restartTopic(consumer); + } + }); + }); } + private void restartTopic(KafkaJobDataConsumer consumer) { + InfoType type = consumer.getJob().getType(); + KafkaTopicListener topic = this.topicListeners.get(type.getId()); + topic.start(); + restartConsumersOfType(topic, type); + } + + private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { + this.consumers.get(type.getId()).forEach((consumer) -> { + consumer.start(topic.getOutput()); + }); + } }