NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicConsumers.java
index 29ad8c7..4809017 100644 (file)
@@ -71,7 +71,6 @@ public class KafkaTopicConsumers {
             public void onJobRemoved(Job job) {
                 removeJob(job);
             }
-
         });
     }
 
@@ -84,7 +83,7 @@ public class KafkaTopicConsumers {
                 topicConsumer.start();
             }
             KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
-            subscription.start(topicConsumer.getOutput());
+            subscription.start(topicConsumer.getOutput().asFlux());
             consumers.put(job.getType().getId(), job.getId(), subscription);
         }
     }
@@ -98,14 +97,12 @@ public class KafkaTopicConsumers {
     }
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
-    public synchronized void restartNonRunningTasks() {
-        this.consumers.keySet().forEach(typeId -> {
-            this.consumers.get(typeId).forEach(consumer -> {
-                if (!consumer.isRunning()) {
-                    restartTopic(consumer);
-                }
-            });
-        });
+    public synchronized void restartNonRunningTopics() {
+        for (String typeId : this.consumers.keySet()) {
+            for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+                restartTopic(consumer);
+            }
+        }
     }
 
     private void restartTopic(KafkaJobDataConsumer consumer) {
@@ -116,8 +113,6 @@ public class KafkaTopicConsumers {
     }
 
     private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
-        this.consumers.get(type.getId()).forEach((consumer) -> {
-            consumer.start(topic.getOutput());
-        });
+        this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
     }
 }