import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
/**
* The class streams data from a multi cast sink and sends the data to the Job
this.job = job;
}
- public synchronized void start(Many<String> input) {
+ public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
- t -> stop(), //
+ this::handleExceptionInStream, //
() -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
}
+ private void handleExceptionInStream(Throwable t) {
+ logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ stop();
+ }
+
private Mono<String> postToClient(String body) {
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
public synchronized void stop() {
if (this.subscription != null) {
- subscription.dispose();
- subscription = null;
+ this.subscription.dispose();
+ this.subscription = null;
}
}
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
- Flux<String> result = input.asFlux() //
- .filter(job::isFilterMatch);
+ private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ Flux<String> result = input.filter(job::isFilterMatch);
if (job.isBuffered()) {
result = result.map(this::quote) //