NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaJobDataConsumer.java
index 5550ce0..2a16f47 100644 (file)
@@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
 
 /**
  * The class streams data from a multi cast sink and sends the data to the Job
@@ -75,17 +74,22 @@ public class KafkaJobDataConsumer {
         this.job = job;
     }
 
-    public synchronized void start(Many<String> input) {
+    public synchronized void start(Flux<String> input) {
         stop();
         this.errorStats.resetKafkaErrors();
         this.subscription = getMessagesFromKafka(input, job) //
                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
-                        t -> stop(), //
+                        this::handleExceptionInStream, //
                         () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
     }
 
+    private void handleExceptionInStream(Throwable t) {
+        logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+        stop();
+    }
+
     private Mono<String> postToClient(String body) {
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
         MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
@@ -94,8 +98,8 @@ public class KafkaJobDataConsumer {
 
     public synchronized void stop() {
         if (this.subscription != null) {
-            subscription.dispose();
-            subscription = null;
+            this.subscription.dispose();
+            this.subscription = null;
         }
     }
 
@@ -103,9 +107,8 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
-        Flux<String> result = input.asFlux() //
-                .filter(job::isFilterMatch);
+    private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+        Flux<String> result = input.filter(job::isFilterMatch);
 
         if (job.isBuffered()) {
             result = result.map(this::quote) //