+ private Mono<?> checkProducerJobs(InfoProducer producer) {
+ final int MAX_CONCURRENCY = 10;
+ return getEiJobs(producer) //
+ .filter(infoJob -> !producer.isJobEnabled(infoJob)) //
+ .flatMap(infoJob -> producerCallbacks.startInfoJob(producer, infoJob, Retry.max(1)), MAX_CONCURRENCY) //
+ .collectList() //
+ .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getInfoTypes())) //
+ .collectList();
+ }
+
+ private Flux<InfoJob> getEiJobs(InfoProducer producer) {
+ return Flux.fromIterable(producer.getInfoTypes()) //
+ .flatMap(infoType -> Flux.fromIterable(infoJobs.getJobsForType(infoType)));
+ }
+
+ private void handleNonRespondingProducer(Throwable throwable, InfoProducer producer) {