X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaJobDataConsumer.java;h=5550ce0e832f1e66dc6e4856a0adff5569f704ff;hb=ce1d9f2d3e1d2713289dc4d2b5c246f99ec65160;hp=d240129ea4ee0703a77a0de9b2e7edd5cb977b10;hpb=b3896f4ad7912be9e12c05e7d4770fa39752d797;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index d240129e..5550ce0e 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -20,9 +20,12 @@ package org.oran.dmaapadapter.tasks; +import lombok.Getter; + import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; @@ -35,29 +38,58 @@ import reactor.core.publisher.Sinks.Many; * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally - public class KafkaJobDataConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class); - private final Many input; + @Getter private final Job job; private Disposable subscription; - private int errorCounter = 0; + private final ErrorStats errorStats = new ErrorStats(); + + private class ErrorStats { + private int consumerFaultCounter = 0; + private boolean kafkaError = false; // eg. overflow + + public void handleOkFromConsumer() { + this.consumerFaultCounter = 0; + } + + public void handleException(Throwable t) { + if (t instanceof WebClientResponseException) { + ++this.consumerFaultCounter; + } else { + kafkaError = true; + } + } - KafkaJobDataConsumer(Many input, Job job) { - this.input = input; + public boolean isItHopeless() { + final int STOP_AFTER_ERRORS = 5; + return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS; + } + + public void resetKafkaErrors() { + kafkaError = false; + } + } + + public KafkaJobDataConsumer(Job job) { this.job = job; } - public synchronized void start() { + public synchronized void start(Many input) { stop(); - this.subscription = getMessagesFromKafka(job) // - .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) - .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) // + this.errorStats.resetKafkaErrors(); + this.subscription = getMessagesFromKafka(input, job) // + .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // - this::handleErrorInStream, // - () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(), - job.getType().getId())); + t -> stop(), // + () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId())); + } + + private Mono postToClient(String body) { + logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); + MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null; + return job.getConsumerRestClient().post("", body, contentType); } public synchronized void stop() { @@ -71,43 +103,37 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Job job) { + private Flux getMessagesFromKafka(Many input, Job job) { Flux result = input.asFlux() // .filter(job::isFilterMatch); if (job.isBuffered()) { - result = result.bufferTimeout( // - job.getParameters().getBufferTimeout().getMaxSize(), // - job.getParameters().getBufferTimeout().getMaxTime()) // + result = result.map(this::quote) // + .bufferTimeout( // + job.getParameters().getBufferTimeout().getMaxSize(), // + job.getParameters().getBufferTimeout().getMaxTime()) // .map(Object::toString); } return result; } - private Mono handleError(Throwable t) { - logger.warn("exception: {} job: {}", t.getMessage(), job); + private String quote(String str) { + final String q = "\""; + return q + str.replace(q, "\\\"") + q; + } - final int STOP_AFTER_ERRORS = 5; - if (t instanceof WebClientResponseException) { - if (++this.errorCounter > STOP_AFTER_ERRORS) { - logger.error("Stopping job {}", job); - return Mono.error(t); - } else { - return Mono.empty(); // Discard - } + private Mono handleError(Throwable t) { + logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); + this.errorStats.handleException(t); + if (this.errorStats.isItHopeless()) { + return Mono.error(t); } else { - // This can happen if there is an overflow. - return Mono.empty(); + return Mono.empty(); // Ignore } } private void handleConsumerSentOk(String data) { - this.errorCounter = 0; - } - - private void handleErrorInStream(Throwable t) { - logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage()); - this.subscription = null; + this.errorStats.handleOkFromConsumer(); } }