NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicConsumers.java
index 23d9da2..785f98b 100644 (file)
@@ -23,56 +23,80 @@ package org.oran.dmaapadapter.tasks;
 import java.util.HashMap;
 import java.util.Map;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
-import reactor.core.Disposable;
 
-/**
- * The class fetches incoming requests from DMAAP and sends them further to the
- * consumers that has a job for this InformationType.
- */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 @Component
+@EnableScheduling
 public class KafkaTopicConsumers {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
 
-    private final Map<String, KafkaTopicConsumer> topicConsumers = new HashMap<>();
-    private final Map<String, Disposable> activeSubscriptions = new HashMap<>();
-    private final ApplicationConfig appConfig;
+    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+    @Getter
+    private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
 
-    public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) {
-        this.appConfig = appConfig;
-    }
+    private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
+
+    public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types,
+            @Autowired Jobs jobs) {
 
-    public void start(InfoTypes types) {
         for (InfoType type : types.getAll()) {
             if (type.isKafkaTopicDefined()) {
-                KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type);
-                topicConsumers.put(type.getId(), topicConsumer);
+                KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
+                topicListeners.put(type.getId(), topicConsumer);
             }
         }
+
+        jobs.addObserver(new Jobs.Observer() {
+            @Override
+            public void onJobbAdded(Job job) {
+                addJob(job);
+            }
+
+            @Override
+            public void onJobRemoved(Job job) {
+                removeJob(job);
+            }
+
+        });
     }
 
     public synchronized void addJob(Job job) {
         if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
             logger.debug("Kafka job added {}", job.getId());
-            KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId());
-            Disposable subscription = topicConsumer.startDistributeToConsumer(job);
+            KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
+            subscription.start();
             activeSubscriptions.put(job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        Disposable d = activeSubscriptions.remove(job.getId());
+        KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
         if (d != null) {
             logger.debug("Kafka job removed {}", job.getId());
-            d.dispose();
+            d.stop();
+        }
+    }
+
+    @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
+    public synchronized void restartNonRunningTasks() {
+        for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+            if (!consumer.isRunning()) {
+                consumer.start();
+            }
         }
     }