X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Ftasks%2FProducerSupervision.java;h=c2e4b975b33344043dabeb5b6181d4786fd71935;hb=2dbde318f013212c81c4a1f477d7638ec3367aa5;hp=17c77b3629592bb08df1ec76eb4295fa3269b1c5;hpb=10e254d9b7bc522bb2c25d590e6d203bf25a592d;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java index 17c77b36..c2e4b975 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java @@ -20,9 +20,13 @@ package org.oransc.enrichment.tasks; -import org.oransc.enrichment.clients.AsyncRestClient; -import org.oransc.enrichment.clients.AsyncRestClientFactory; +import java.time.Duration; + import org.oransc.enrichment.configuration.ApplicationConfig; +import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; +import org.oransc.enrichment.controllers.producer.ProducerCallbacks; +import org.oransc.enrichment.repository.EiJob; +import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; import org.oransc.enrichment.repository.EiProducers; import org.slf4j.Logger; @@ -34,6 +38,7 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Regularly checks the availability of the EI Producers @@ -45,13 +50,17 @@ public class ProducerSupervision { private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class); private final EiProducers eiProducers; - private final AsyncRestClient restClient; + private final EiJobs eiJobs; + private final ProducerCallbacks producerCallbacks; + private final ConsumerCallbacks consumerCallbacks; @Autowired - public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers) { - AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); - this.restClient = restClientFactory.createRestClientNoHttpProxy(""); + public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs, + ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) { this.eiProducers = eiProducers; + this.eiJobs = eiJobs; + this.producerCallbacks = producerCallbacks; + this.consumerCallbacks = consumerCallbacks; } @Scheduled(fixedRate = 1000 * 60 * 5) @@ -66,13 +75,33 @@ public class ProducerSupervision { } private Mono checkOneProducer(EiProducer producer) { - return restClient.get(producer.getProducerSupervisionCallbackUrl()) // + return this.producerCallbacks.healthCheck(producer) // .onErrorResume(throwable -> { handleNonRespondingProducer(throwable, producer); return Mono.empty(); })// .doOnNext(response -> handleRespondingProducer(response, producer)) - .flatMap(response -> Mono.just(producer)); + .flatMap(response -> checkProducerJobs(producer)) // + .flatMap(responses -> Mono.just(producer)); + } + + private Mono checkProducerJobs(EiProducer producer) { + return getEiJobs(producer) // + .filter(eiJob -> !producer.isJobEnabled(eiJob)) // + .flatMap(eiJob -> startEiJob(producer, eiJob), 1) // + .collectList() // + .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // + .collectList(); + } + + private Mono startEiJob(EiProducer producer, EiJob eiJob) { + Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); + return producerCallbacks.startEiJob(producer, eiJob, retrySpec); + } + + private Flux getEiJobs(EiProducer producer) { + return Flux.fromIterable(producer.getEiTypes()) // + .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType))); } private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) {