import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
/**
* The class streams data from a multi cast sink and sends the data to the Job
this.job = job;
}
- public synchronized void start(Many<String> input) {
+ public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
public synchronized void stop() {
if (this.subscription != null) {
- subscription.dispose();
- subscription = null;
+ this.subscription.dispose();
+ this.subscription = null;
}
}
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
- Flux<String> result = input.asFlux() //
- .filter(job::isFilterMatch);
+ private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ Flux<String> result = input.filter(job::isFilterMatch);
if (job.isBuffered()) {
result = result.map(this::quote) //