NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicConsumers.java
index 0ed85c6..29ad8c7 100644 (file)
@@ -30,6 +30,7 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.MultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -46,7 +47,7 @@ public class KafkaTopicConsumers {
     private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
-    private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
+    private final MultiMap<KafkaJobDataConsumer> consumers = new MultiMap<>(); // Key is typeId, jobId
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
@@ -75,17 +76,21 @@ public class KafkaTopicConsumers {
     }
 
     public synchronized void addJob(Job job) {
-        if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+        if (job.getType().isKafkaTopicDefined()) {
+            removeJob(job);
             logger.debug("Kafka job added {}", job.getId());
             KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+            if (consumers.get(job.getType().getId()).isEmpty()) {
+                topicConsumer.start();
+            }
             KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
             subscription.start(topicConsumer.getOutput());
-            consumers.put(job.getId(), subscription);
+            consumers.put(job.getType().getId(), job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        KafkaJobDataConsumer d = consumers.remove(job.getId());
+        KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId());
         if (d != null) {
             logger.debug("Kafka job removed {}", job.getId());
             d.stop();
@@ -94,12 +99,13 @@ public class KafkaTopicConsumers {
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
     public synchronized void restartNonRunningTasks() {
-
-        for (KafkaJobDataConsumer consumer : consumers.values()) {
-            if (!consumer.isRunning()) {
-                restartTopic(consumer);
-            }
-        }
+        this.consumers.keySet().forEach(typeId -> {
+            this.consumers.get(typeId).forEach(consumer -> {
+                if (!consumer.isRunning()) {
+                    restartTopic(consumer);
+                }
+            });
+        });
     }
 
     private void restartTopic(KafkaJobDataConsumer consumer) {
@@ -110,10 +116,8 @@ public class KafkaTopicConsumers {
     }
 
     private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
-        this.consumers.forEach((jobId, consumer) -> {
-            if (consumer.getJob().getType().getId().equals(type.getId())) {
-                consumer.start(topic.getOutput());
-            }
+        this.consumers.get(type.getId()).forEach((consumer) -> {
+            consumer.start(topic.getOutput());
         });
     }
 }