X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaTopicConsumers.java;h=5233401b231ea7883a1c219abfc025c4db289b87;hb=844931b62f35ce6ee2d9dc7274573fc54e14407a;hp=0ed85c6a1b1262d76fca69ec325d65077436ddfc;hpb=0f6367023720ecc7d7b4b38cbbc4282792172a89;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 0ed85c6a..5233401b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -30,6 +30,7 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.MultiMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +47,7 @@ public class KafkaTopicConsumers { private final Map topicListeners = new HashMap<>(); // Key is typeId @Getter - private final Map consumers = new HashMap<>(); // Key is jobId + private final MultiMap consumers = new MultiMap<>(); // Key is typeId, jobId private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; @@ -70,22 +71,25 @@ public class KafkaTopicConsumers { public void onJobRemoved(Job job) { removeJob(job); } - }); } public synchronized void addJob(Job job) { - if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { + if (job.getType().isKafkaTopicDefined()) { + removeJob(job); logger.debug("Kafka job added {}", job.getId()); KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId()); + if (consumers.get(job.getType().getId()).isEmpty()) { + topicConsumer.start(); + } KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); - subscription.start(topicConsumer.getOutput()); - consumers.put(job.getId(), subscription); + subscription.start(topicConsumer.getOutput().asFlux()); + consumers.put(job.getType().getId(), job.getId(), subscription); } } public synchronized void removeJob(Job job) { - KafkaJobDataConsumer d = consumers.remove(job.getId()); + KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId()); if (d != null) { logger.debug("Kafka job removed {}", job.getId()); d.stop(); @@ -93,11 +97,12 @@ public class KafkaTopicConsumers { } @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) - public synchronized void restartNonRunningTasks() { - - for (KafkaJobDataConsumer consumer : consumers.values()) { - if (!consumer.isRunning()) { - restartTopic(consumer); + public synchronized void restartNonRunningTopics() { + for (String typeId : this.consumers.keySet()) { + for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) { + if (!consumer.isRunning()) { + restartTopic(consumer); + } } } } @@ -110,10 +115,6 @@ public class KafkaTopicConsumers { } private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.consumers.forEach((jobId, consumer) -> { - if (consumer.getJob().getType().getId().equals(type.getId())) { - consumer.start(topic.getOutput()); - } - }); + this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } }