+ public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+ final int maxNoOfParalellRequests = 10;
+ Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+
+ Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
+ .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+ .onErrorResume(t -> {
+ logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
+ return Flux.empty();
+ }) //
+ .subscribe();
+ }
+
+ private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {