Merge "Updated test of info-types"
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / tasks / ProducerSupervision.java
index c2e4b97..7852bef 100644 (file)
 
 package org.oransc.enrichment.tasks;
 
-import java.time.Duration;
-
 import org.oransc.enrichment.configuration.ApplicationConfig;
-import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
-import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
+import org.oransc.enrichment.controllers.a1e.A1eCallbacks;
+import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
@@ -52,11 +50,11 @@ public class ProducerSupervision {
     private final EiProducers eiProducers;
     private final EiJobs eiJobs;
     private final ProducerCallbacks producerCallbacks;
-    private final ConsumerCallbacks consumerCallbacks;
+    private final A1eCallbacks consumerCallbacks;
 
     @Autowired
     public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
-        ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) {
+        ProducerCallbacks producerCallbacks, A1eCallbacks consumerCallbacks) {
         this.eiProducers = eiProducers;
         this.eiJobs = eiJobs;
         this.producerCallbacks = producerCallbacks;
@@ -86,19 +84,15 @@ public class ProducerSupervision {
     }
 
     private Mono<?> checkProducerJobs(EiProducer producer) {
+        final int MAX_CONCURRENCY = 10;
         return getEiJobs(producer) //
             .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
-            .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+            .flatMap(eiJob -> producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY) //
             .collectList() //
-            .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+            .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
             .collectList();
     }
 
-    private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
-        Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
-        return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
-    }
-
     private Flux<EiJob> getEiJobs(EiProducer producer) {
         return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));