X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaTopicConsumers.java;h=48090170fd83767b34a7d93f5473ba7a228aacb1;hb=2389606af9e77879c76e2f87822b5b7d68920d19;hp=29ad8c7518e188b20451611aa7193024c58a3032;hpb=242299199382ec3fd7d514dde2eb607086a6a46e;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 29ad8c75..48090170 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -71,7 +71,6 @@ public class KafkaTopicConsumers { public void onJobRemoved(Job job) { removeJob(job); } - }); } @@ -84,7 +83,7 @@ public class KafkaTopicConsumers { topicConsumer.start(); } KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); - subscription.start(topicConsumer.getOutput()); + subscription.start(topicConsumer.getOutput().asFlux()); consumers.put(job.getType().getId(), job.getId(), subscription); } } @@ -98,14 +97,12 @@ public class KafkaTopicConsumers { } @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) - public synchronized void restartNonRunningTasks() { - this.consumers.keySet().forEach(typeId -> { - this.consumers.get(typeId).forEach(consumer -> { - if (!consumer.isRunning()) { - restartTopic(consumer); - } - }); - }); + public synchronized void restartNonRunningTopics() { + for (String typeId : this.consumers.keySet()) { + for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) { + restartTopic(consumer); + } + } } private void restartTopic(KafkaJobDataConsumer consumer) { @@ -116,8 +113,6 @@ public class KafkaTopicConsumers { } private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.consumers.get(type.getId()).forEach((consumer) -> { - consumer.start(topic.getOutput()); - }); + this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } }