X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Ftasks%2FProducerSupervision.java;h=d73127f2c90b4c09de7f8bdc5f47f1195b708a73;hb=c4aef8252dec9bdd63ab16e3ff89b0d814aed462;hp=c2e4b975b33344043dabeb5b6181d4786fd71935;hpb=6296305fad90e21103fa92e05e0d21a3ba1356a2;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java index c2e4b975..d73127f2 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java @@ -20,8 +20,6 @@ package org.oransc.enrichment.tasks; -import java.time.Duration; - import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; import org.oransc.enrichment.controllers.producer.ProducerCallbacks; @@ -86,19 +84,15 @@ public class ProducerSupervision { } private Mono checkProducerJobs(EiProducer producer) { + final int MAX_CONCURRENCY = 10; return getEiJobs(producer) // .filter(eiJob -> !producer.isJobEnabled(eiJob)) // - .flatMap(eiJob -> startEiJob(producer, eiJob), 1) // + .flatMap(eiJob -> producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY) // .collectList() // - .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // + .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // .collectList(); } - private Mono startEiJob(EiProducer producer, EiJob eiJob) { - Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); - return producerCallbacks.startEiJob(producer, eiJob, retrySpec); - } - private Flux getEiJobs(EiProducer producer) { return Flux.fromIterable(producer.getEiTypes()) // .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));