NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaJobDataConsumer.java
index d240129..5550ce0 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
@@ -35,29 +38,58 @@ import reactor.core.publisher.Sinks.Many;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-
 public class KafkaJobDataConsumer {
     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
-    private final Many<String> input;
+    @Getter
     private final Job job;
     private Disposable subscription;
-    private int errorCounter = 0;
+    private final ErrorStats errorStats = new ErrorStats();
+
+    private class ErrorStats {
+        private int consumerFaultCounter = 0;
+        private boolean kafkaError = false; // eg. overflow
+
+        public void handleOkFromConsumer() {
+            this.consumerFaultCounter = 0;
+        }
+
+        public void handleException(Throwable t) {
+            if (t instanceof WebClientResponseException) {
+                ++this.consumerFaultCounter;
+            } else {
+                kafkaError = true;
+            }
+        }
 
-    KafkaJobDataConsumer(Many<String> input, Job job) {
-        this.input = input;
+        public boolean isItHopeless() {
+            final int STOP_AFTER_ERRORS = 5;
+            return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+        }
+
+        public void resetKafkaErrors() {
+            kafkaError = false;
+        }
+    }
+
+    public KafkaJobDataConsumer(Job job) {
         this.job = job;
     }
 
-    public synchronized void start() {
+    public synchronized void start(Many<String> input) {
         stop();
-        this.subscription = getMessagesFromKafka(job) //
-                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
-                .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+        this.errorStats.resetKafkaErrors();
+        this.subscription = getMessagesFromKafka(input, job) //
+                .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
-                        this::handleErrorInStream, //
-                        () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
-                                job.getType().getId()));
+                        t -> stop(), //
+                        () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+    }
+
+    private Mono<String> postToClient(String body) {
+        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+        MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+        return job.getConsumerRestClient().post("", body, contentType);
     }
 
     public synchronized void stop() {
@@ -71,43 +103,37 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Job job) {
+    private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
         Flux<String> result = input.asFlux() //
                 .filter(job::isFilterMatch);
 
         if (job.isBuffered()) {
-            result = result.bufferTimeout( //
-                    job.getParameters().getBufferTimeout().getMaxSize(), //
-                    job.getParameters().getBufferTimeout().getMaxTime()) //
+            result = result.map(this::quote) //
+                    .bufferTimeout( //
+                            job.getParameters().getBufferTimeout().getMaxSize(), //
+                            job.getParameters().getBufferTimeout().getMaxTime()) //
                     .map(Object::toString);
         }
         return result;
     }
 
-    private Mono<String> handleError(Throwable t) {
-        logger.warn("exception: {} job: {}", t.getMessage(), job);
+    private String quote(String str) {
+        final String q = "\"";
+        return q + str.replace(q, "\\\"") + q;
+    }
 
-        final int STOP_AFTER_ERRORS = 5;
-        if (t instanceof WebClientResponseException) {
-            if (++this.errorCounter > STOP_AFTER_ERRORS) {
-                logger.error("Stopping job {}", job);
-                return Mono.error(t);
-            } else {
-                return Mono.empty(); // Discard
-            }
+    private Mono<String> handleError(Throwable t) {
+        logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+        this.errorStats.handleException(t);
+        if (this.errorStats.isItHopeless()) {
+            return Mono.error(t);
         } else {
-            // This can happen if there is an overflow.
-            return Mono.empty();
+            return Mono.empty(); // Ignore
         }
     }
 
     private void handleConsumerSentOk(String data) {
-        this.errorCounter = 0;
-    }
-
-    private void handleErrorInStream(Throwable t) {
-        logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
-        this.subscription = null;
+        this.errorStats.handleOkFromConsumer();
     }
 
 }