+ .flatMap(response -> checkProducerJobs(producer)) //
+ .flatMap(responses -> Mono.just(producer));
+ }
+
+ private Mono<?> checkProducerJobs(EiProducer producer) {
+ final int MAX_CONCURRENCY = 10;
+ return getEiJobs(producer) //
+ .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
+ .flatMap(eiJob -> producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY) //
+ .collectList() //
+ .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList();
+ }
+
+ private Flux<EiJob> getEiJobs(EiProducer producer) {
+ return Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));