X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaJobDataConsumer.java;h=2a16f47598785d5890a66e216481c573c60e14c6;hb=86f81813c94a44337c199124e7bbf6280e2c6aa6;hp=5550ce0e832f1e66dc6e4856a0adff5569f704ff;hpb=ce1d9f2d3e1d2713289dc4d2b5c246f99ec65160;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index 5550ce0e..2a16f475 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks.Many; /** * The class streams data from a multi cast sink and sends the data to the Job @@ -75,17 +74,22 @@ public class KafkaJobDataConsumer { this.job = job; } - public synchronized void start(Many input) { + public synchronized void start(Flux input) { stop(); this.errorStats.resetKafkaErrors(); this.subscription = getMessagesFromKafka(input, job) // .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // - t -> stop(), // + this::handleExceptionInStream, // () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId())); } + private void handleExceptionInStream(Throwable t) { + logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); + stop(); + } + private Mono postToClient(String body) { logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null; @@ -94,8 +98,8 @@ public class KafkaJobDataConsumer { public synchronized void stop() { if (this.subscription != null) { - subscription.dispose(); - subscription = null; + this.subscription.dispose(); + this.subscription = null; } } @@ -103,9 +107,8 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Many input, Job job) { - Flux result = input.asFlux() // - .filter(job::isFilterMatch); + private Flux getMessagesFromKafka(Flux input, Job job) { + Flux result = input.filter(job::isFilterMatch); if (job.isBuffered()) { result = result.map(this::quote) //