NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicConsumers.java
index 785f98b..0ed85c6 100644 (file)
@@ -43,9 +43,10 @@ import org.springframework.stereotype.Component;
 public class KafkaTopicConsumers {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
 
-    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+
     @Getter
-    private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+    private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
@@ -74,17 +75,17 @@ public class KafkaTopicConsumers {
     }
 
     public synchronized void addJob(Job job) {
-        if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+        if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
             logger.debug("Kafka job added {}", job.getId());
             KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
-            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
-            subscription.start();
-            activeSubscriptions.put(job.getId(), subscription);
+            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
+            subscription.start(topicConsumer.getOutput());
+            consumers.put(job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+        KafkaJobDataConsumer d = consumers.remove(job.getId());
         if (d != null) {
             logger.debug("Kafka job removed {}", job.getId());
             d.stop();
@@ -93,11 +94,26 @@ public class KafkaTopicConsumers {
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
     public synchronized void restartNonRunningTasks() {
-        for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+
+        for (KafkaJobDataConsumer consumer : consumers.values()) {
             if (!consumer.isRunning()) {
-                consumer.start();
+                restartTopic(consumer);
             }
         }
     }
 
+    private void restartTopic(KafkaJobDataConsumer consumer) {
+        InfoType type = consumer.getJob().getType();
+        KafkaTopicListener topic = this.topicListeners.get(type.getId());
+        topic.start();
+        restartConsumersOfType(topic, type);
+    }
+
+    private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+        this.consumers.forEach((jobId, consumer) -> {
+            if (consumer.getJob().getType().getId().equals(type.getId())) {
+                consumer.start(topic.getOutput());
+            }
+        });
+    }
 }