import java.util.HashMap;
import java.util.Map;
+import lombok.Getter;
+
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
-import reactor.core.Disposable;
-/**
- * The class fetches incoming requests from DMAAP and sends them further to the
- * consumers that has a job for this InformationType.
- */
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@Component
+@EnableScheduling
public class KafkaTopicConsumers {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
- private final Map<String, KafkaTopicConsumer> topicConsumers = new HashMap<>();
- private final Map<String, Disposable> activeSubscriptions = new HashMap<>();
- private final ApplicationConfig appConfig;
+ private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+ @Getter
+ private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
- public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) {
- this.appConfig = appConfig;
- }
+ private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
+
+ public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types,
+ @Autowired Jobs jobs) {
- public void start(InfoTypes types) {
for (InfoType type : types.getAll()) {
if (type.isKafkaTopicDefined()) {
- KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type);
- topicConsumers.put(type.getId(), topicConsumer);
+ KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
+ topicListeners.put(type.getId(), topicConsumer);
}
}
+
+ jobs.addObserver(new Jobs.Observer() {
+ @Override
+ public void onJobbAdded(Job job) {
+ addJob(job);
+ }
+
+ @Override
+ public void onJobRemoved(Job job) {
+ removeJob(job);
+ }
+
+ });
}
public synchronized void addJob(Job job) {
if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
logger.debug("Kafka job added {}", job.getId());
- KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId());
- Disposable subscription = topicConsumer.startDistributeToConsumer(job);
+ KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+ KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
+ subscription.start();
activeSubscriptions.put(job.getId(), subscription);
}
}
public synchronized void removeJob(Job job) {
- Disposable d = activeSubscriptions.remove(job.getId());
+ KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
- d.dispose();
+ d.stop();
+ }
+ }
+
+ @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
+ public synchronized void restartNonRunningTasks() {
+ for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+ if (!consumer.isRunning()) {
+ consumer.start();
+ }
}
}