package org.oran.dmaapadapter.tasks;
+import lombok.Getter;
+
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-
public class KafkaJobDataConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
- private final Many<String> input;
+ @Getter
private final Job job;
private Disposable subscription;
- private int errorCounter = 0;
+ private final ErrorStats errorStats = new ErrorStats();
+
+ private class ErrorStats {
+ private int consumerFaultCounter = 0;
+ private boolean kafkaError = false; // eg. overflow
+
+ public void handleOkFromConsumer() {
+ this.consumerFaultCounter = 0;
+ }
+
+ public void handleException(Throwable t) {
+ if (t instanceof WebClientResponseException) {
+ ++this.consumerFaultCounter;
+ } else {
+ kafkaError = true;
+ }
+ }
+
+ public boolean isItHopeless() {
+ final int STOP_AFTER_ERRORS = 5;
+ return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+ }
+
+ public void resetKafkaErrors() {
+ kafkaError = false;
+ }
+ }
- KafkaJobDataConsumer(Many<String> input, Job job) {
- this.input = input;
+ public KafkaJobDataConsumer(Job job) {
this.job = job;
}
- public synchronized void start() {
+ public synchronized void start(Many<String> input) {
stop();
- this.subscription = getMessagesFromKafka(job) //
- .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
- .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+ this.errorStats.resetKafkaErrors();
+ this.subscription = getMessagesFromKafka(input, job) //
+ .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
- this::handleErrorInStream, //
- () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
- job.getType().getId()));
+ this::handleExceptionInStream, //
+ () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+ }
+
+ private void handleExceptionInStream(Throwable t) {
+ logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ stop();
+ }
+
+ private Mono<String> postToClient(String body) {
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+ MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+ return job.getConsumerRestClient().post("", body, contentType);
}
public synchronized void stop() {
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Job job) {
+ private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
Flux<String> result = input.asFlux() //
.filter(job::isFilterMatch);
if (job.isBuffered()) {
- result = result.bufferTimeout( //
- job.getParameters().getBufferTimeout().getMaxSize(), //
- job.getParameters().getBufferTimeout().getMaxTime()) //
+ result = result.map(this::quote) //
+ .bufferTimeout( //
+ job.getParameters().getBufferTimeout().getMaxSize(), //
+ job.getParameters().getBufferTimeout().getMaxTime()) //
.map(Object::toString);
}
return result;
}
- private Mono<String> handleError(Throwable t) {
- logger.warn("exception: {} job: {}", t.getMessage(), job);
+ private String quote(String str) {
+ final String q = "\"";
+ return q + str.replace(q, "\\\"") + q;
+ }
- final int STOP_AFTER_ERRORS = 5;
- if (t instanceof WebClientResponseException) {
- if (++this.errorCounter > STOP_AFTER_ERRORS) {
- logger.error("Stopping job {}", job);
- return Mono.error(t);
- } else {
- return Mono.empty(); // Discard
- }
+ private Mono<String> handleError(Throwable t) {
+ logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+ this.errorStats.handleException(t);
+ if (this.errorStats.isItHopeless()) {
+ return Mono.error(t);
} else {
- // This can happen if there is an overflow.
- return Mono.empty();
+ return Mono.empty(); // Ignore
}
}
private void handleConsumerSentOk(String data) {
- this.errorCounter = 0;
- }
-
- private void handleErrorInStream(Throwable t) {
- logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
- this.subscription = null;
+ this.errorStats.handleOkFromConsumer();
}
}