X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaJobDataConsumer.java;h=2a16f47598785d5890a66e216481c573c60e14c6;hb=86f81813c94a44337c199124e7bbf6280e2c6aa6;hp=d240129ea4ee0703a77a0de9b2e7edd5cb977b10;hpb=6f48adb69090799c74c29204dd2cd1737cc9d6ac;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index d240129e..2a16f475 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -20,50 +20,86 @@ package org.oran.dmaapadapter.tasks; +import lombok.Getter; + import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks.Many; /** * The class streams data from a multi cast sink and sends the data to the Job * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally - public class KafkaJobDataConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class); - private final Many input; + @Getter private final Job job; private Disposable subscription; - private int errorCounter = 0; + private final ErrorStats errorStats = new ErrorStats(); + + private class ErrorStats { + private int consumerFaultCounter = 0; + private boolean kafkaError = false; // eg. overflow + + public void handleOkFromConsumer() { + this.consumerFaultCounter = 0; + } + + public void handleException(Throwable t) { + if (t instanceof WebClientResponseException) { + ++this.consumerFaultCounter; + } else { + kafkaError = true; + } + } + + public boolean isItHopeless() { + final int STOP_AFTER_ERRORS = 5; + return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS; + } + + public void resetKafkaErrors() { + kafkaError = false; + } + } - KafkaJobDataConsumer(Many input, Job job) { - this.input = input; + public KafkaJobDataConsumer(Job job) { this.job = job; } - public synchronized void start() { + public synchronized void start(Flux input) { stop(); - this.subscription = getMessagesFromKafka(job) // - .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) - .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) // + this.errorStats.resetKafkaErrors(); + this.subscription = getMessagesFromKafka(input, job) // + .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // - this::handleErrorInStream, // - () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(), - job.getType().getId())); + this::handleExceptionInStream, // + () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId())); + } + + private void handleExceptionInStream(Throwable t) { + logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); + stop(); + } + + private Mono postToClient(String body) { + logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); + MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null; + return job.getConsumerRestClient().post("", body, contentType); } public synchronized void stop() { if (this.subscription != null) { - subscription.dispose(); - subscription = null; + this.subscription.dispose(); + this.subscription = null; } } @@ -71,43 +107,36 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Job job) { - Flux result = input.asFlux() // - .filter(job::isFilterMatch); + private Flux getMessagesFromKafka(Flux input, Job job) { + Flux result = input.filter(job::isFilterMatch); if (job.isBuffered()) { - result = result.bufferTimeout( // - job.getParameters().getBufferTimeout().getMaxSize(), // - job.getParameters().getBufferTimeout().getMaxTime()) // + result = result.map(this::quote) // + .bufferTimeout( // + job.getParameters().getBufferTimeout().getMaxSize(), // + job.getParameters().getBufferTimeout().getMaxTime()) // .map(Object::toString); } return result; } - private Mono handleError(Throwable t) { - logger.warn("exception: {} job: {}", t.getMessage(), job); + private String quote(String str) { + final String q = "\""; + return q + str.replace(q, "\\\"") + q; + } - final int STOP_AFTER_ERRORS = 5; - if (t instanceof WebClientResponseException) { - if (++this.errorCounter > STOP_AFTER_ERRORS) { - logger.error("Stopping job {}", job); - return Mono.error(t); - } else { - return Mono.empty(); // Discard - } + private Mono handleError(Throwable t) { + logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); + this.errorStats.handleException(t); + if (this.errorStats.isItHopeless()) { + return Mono.error(t); } else { - // This can happen if there is an overflow. - return Mono.empty(); + return Mono.empty(); // Ignore } } private void handleConsumerSentOk(String data) { - this.errorCounter = 0; - } - - private void handleErrorInStream(Throwable t) { - logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage()); - this.subscription = null; + this.errorStats.handleOkFromConsumer(); } }