+ .flatMap(response -> checkProducerJobs(producer)) //
+ .flatMap(responses -> Mono.just(producer));
+ }
+
+ private Mono<?> checkProducerJobs(EiProducer producer) {
+ return getEiJobs(producer) //
+ .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
+ .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+ .collectList() //
+ .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList();
+ }
+
+ private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
+ Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
+ return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
+ }
+
+ private Flux<EiJob> getEiJobs(EiProducer producer) {
+ return Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));