X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaTopicConsumers.java;h=5233401b231ea7883a1c219abfc025c4db289b87;hb=844931b62f35ce6ee2d9dc7274573fc54e14407a;hp=29ad8c7518e188b20451611aa7193024c58a3032;hpb=a28a4ad261601976c345425692116e5d7250b810;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 29ad8c75..5233401b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -71,7 +71,6 @@ public class KafkaTopicConsumers { public void onJobRemoved(Job job) { removeJob(job); } - }); } @@ -84,7 +83,7 @@ public class KafkaTopicConsumers { topicConsumer.start(); } KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); - subscription.start(topicConsumer.getOutput()); + subscription.start(topicConsumer.getOutput().asFlux()); consumers.put(job.getType().getId(), job.getId(), subscription); } } @@ -98,14 +97,14 @@ public class KafkaTopicConsumers { } @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) - public synchronized void restartNonRunningTasks() { - this.consumers.keySet().forEach(typeId -> { - this.consumers.get(typeId).forEach(consumer -> { + public synchronized void restartNonRunningTopics() { + for (String typeId : this.consumers.keySet()) { + for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) { if (!consumer.isRunning()) { restartTopic(consumer); } - }); - }); + } + } } private void restartTopic(KafkaJobDataConsumer consumer) { @@ -116,8 +115,6 @@ public class KafkaTopicConsumers { } private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.consumers.get(type.getId()).forEach((consumer) -> { - consumer.start(topic.getOutput()); - }); + this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } }