X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaJobDataConsumer.java;h=2a16f47598785d5890a66e216481c573c60e14c6;hb=5feecd881172a3b22041d35443c1f946e7d5f63e;hp=f677502c664ad41834eeca2f174f4e8a500db0df;hpb=a28a4ad261601976c345425692116e5d7250b810;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index f677502c..2a16f475 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks.Many; /** * The class streams data from a multi cast sink and sends the data to the Job @@ -75,7 +74,7 @@ public class KafkaJobDataConsumer { this.job = job; } - public synchronized void start(Many input) { + public synchronized void start(Flux input) { stop(); this.errorStats.resetKafkaErrors(); this.subscription = getMessagesFromKafka(input, job) // @@ -99,8 +98,8 @@ public class KafkaJobDataConsumer { public synchronized void stop() { if (this.subscription != null) { - subscription.dispose(); - subscription = null; + this.subscription.dispose(); + this.subscription = null; } } @@ -108,9 +107,8 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Many input, Job job) { - Flux result = input.asFlux() // - .filter(job::isFilterMatch); + private Flux getMessagesFromKafka(Flux input, Job job) { + Flux result = input.filter(job::isFilterMatch); if (job.isBuffered()) { result = result.map(this::quote) //