package org.oransc.enrichment.tasks;
-import java.time.Duration;
-
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
}
private Mono<?> checkProducerJobs(EiProducer producer) {
+ final int MAX_CONCURRENCY = 10;
return getEiJobs(producer) //
.filter(eiJob -> !producer.isJobEnabled(eiJob)) //
- .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+ .flatMap(eiJob -> producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY) //
.collectList() //
- .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
.collectList();
}
- private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
- Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
- }
-
private Flux<EiJob> getEiJobs(EiProducer producer) {
return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));