import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
private Mono<String> postToClient(String body) {
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
private Mono<String> postToClient(String body) {
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
- private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
- Flux<String> result = input.asFlux() //
- .filter(job::isFilterMatch);
+ private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ Flux<String> result = input.filter(job::isFilterMatch);