X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaTopicConsumers.java;h=785f98bf41cc14d7a008ab5241221ef60e65a918;hb=b3896f4ad7912be9e12c05e7d4770fa39752d797;hp=23d9da2c9294c60e9bf33e7be4c50bbf578b89e4;hpb=5e1623ab25b62c6c28849bfd862eba4648465922;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 23d9da2c..785f98bf 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -23,56 +23,80 @@ package org.oran.dmaapadapter.tasks; import java.util.HashMap; import java.util.Map; +import lombok.Getter; + import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import reactor.core.Disposable; -/** - * The class fetches incoming requests from DMAAP and sends them further to the - * consumers that has a job for this InformationType. - */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally @Component +@EnableScheduling public class KafkaTopicConsumers { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class); - private final Map topicConsumers = new HashMap<>(); - private final Map activeSubscriptions = new HashMap<>(); - private final ApplicationConfig appConfig; + private final Map topicListeners = new HashMap<>(); + @Getter + private final Map activeSubscriptions = new HashMap<>(); - public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; - } + private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; + + public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, + @Autowired Jobs jobs) { - public void start(InfoTypes types) { for (InfoType type : types.getAll()) { if (type.isKafkaTopicDefined()) { - KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type); - topicConsumers.put(type.getId(), topicConsumer); + KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type); + topicListeners.put(type.getId(), topicConsumer); } } + + jobs.addObserver(new Jobs.Observer() { + @Override + public void onJobbAdded(Job job) { + addJob(job); + } + + @Override + public void onJobRemoved(Job job) { + removeJob(job); + } + + }); } public synchronized void addJob(Job job) { if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { logger.debug("Kafka job added {}", job.getId()); - KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId()); - Disposable subscription = topicConsumer.startDistributeToConsumer(job); + KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId()); + KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job); + subscription.start(); activeSubscriptions.put(job.getId(), subscription); } } public synchronized void removeJob(Job job) { - Disposable d = activeSubscriptions.remove(job.getId()); + KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId()); if (d != null) { logger.debug("Kafka job removed {}", job.getId()); - d.dispose(); + d.stop(); + } + } + + @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) + public synchronized void restartNonRunningTasks() { + for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) { + if (!consumer.isRunning()) { + consumer.start(); + } } }