Added some logging
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / tasks / ProducerSupervision.java
index c2e4b97..d73127f 100644 (file)
@@ -20,8 +20,6 @@
 
 package org.oransc.enrichment.tasks;
 
-import java.time.Duration;
-
 import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
 import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
@@ -86,19 +84,15 @@ public class ProducerSupervision {
     }
 
     private Mono<?> checkProducerJobs(EiProducer producer) {
+        final int MAX_CONCURRENCY = 10;
         return getEiJobs(producer) //
             .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
-            .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+            .flatMap(eiJob -> producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY) //
             .collectList() //
-            .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+            .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
             .collectList();
     }
 
-    private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
-        Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
-        return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
-    }
-
     private Flux<EiJob> getEiJobs(EiProducer producer) {
         return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));