NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaJobDataConsumer.java
index f677502..2a16f47 100644 (file)
@@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
 
 /**
  * The class streams data from a multi cast sink and sends the data to the Job
@@ -75,7 +74,7 @@ public class KafkaJobDataConsumer {
         this.job = job;
     }
 
-    public synchronized void start(Many<String> input) {
+    public synchronized void start(Flux<String> input) {
         stop();
         this.errorStats.resetKafkaErrors();
         this.subscription = getMessagesFromKafka(input, job) //
@@ -99,8 +98,8 @@ public class KafkaJobDataConsumer {
 
     public synchronized void stop() {
         if (this.subscription != null) {
-            subscription.dispose();
-            subscription = null;
+            this.subscription.dispose();
+            this.subscription = null;
         }
     }
 
@@ -108,9 +107,8 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
-        Flux<String> result = input.asFlux() //
-                .filter(job::isFilterMatch);
+    private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+        Flux<String> result = input.filter(job::isFilterMatch);
 
         if (job.isBuffered()) {
             result = result.map(this::quote) //