X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Ftasks%2FProducerSupervision.java;h=08c5fc85e6c0dc532e8530041385c806344ace7a;hb=0f6367023720ecc7d7b4b38cbbc4282792172a89;hp=c2e4b975b33344043dabeb5b6181d4786fd71935;hpb=1e539490bc37fed791895dd1f2f898caa3b0ca5c;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java index c2e4b975..08c5fc85 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java @@ -20,15 +20,13 @@ package org.oransc.enrichment.tasks; -import java.time.Duration; - import org.oransc.enrichment.configuration.ApplicationConfig; -import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; -import org.oransc.enrichment.controllers.producer.ProducerCallbacks; -import org.oransc.enrichment.repository.EiJob; -import org.oransc.enrichment.repository.EiJobs; -import org.oransc.enrichment.repository.EiProducer; -import org.oransc.enrichment.repository.EiProducers; +import org.oransc.enrichment.controllers.a1e.A1eCallbacks; +import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks; +import org.oransc.enrichment.repository.InfoJob; +import org.oransc.enrichment.repository.InfoJobs; +import org.oransc.enrichment.repository.InfoProducer; +import org.oransc.enrichment.repository.InfoProducers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -41,7 +39,7 @@ import reactor.core.publisher.Mono; import reactor.util.retry.Retry; /** - * Regularly checks the availability of the EI Producers + * Regularly checks the availability of the Info Producers */ @Component @EnableScheduling @@ -49,16 +47,16 @@ import reactor.util.retry.Retry; public class ProducerSupervision { private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class); - private final EiProducers eiProducers; - private final EiJobs eiJobs; + private final InfoProducers infoProducers; + private final InfoJobs infoJobs; private final ProducerCallbacks producerCallbacks; - private final ConsumerCallbacks consumerCallbacks; + private final A1eCallbacks consumerCallbacks; @Autowired - public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs, - ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) { - this.eiProducers = eiProducers; - this.eiJobs = eiJobs; + public ProducerSupervision(ApplicationConfig applicationConfig, InfoProducers infoProducers, InfoJobs infoJobs, + ProducerCallbacks producerCallbacks, A1eCallbacks consumerCallbacks) { + this.infoProducers = infoProducers; + this.infoJobs = infoJobs; this.producerCallbacks = producerCallbacks; this.consumerCallbacks = consumerCallbacks; } @@ -69,12 +67,12 @@ public class ProducerSupervision { createTask().subscribe(null, null, () -> logger.debug("Checking all Producers completed")); } - public Flux createTask() { - return Flux.fromIterable(eiProducers.getAllProducers()) // + public Flux createTask() { + return Flux.fromIterable(infoProducers.getAllProducers()) // .flatMap(this::checkOneProducer); } - private Mono checkOneProducer(EiProducer producer) { + private Mono checkOneProducer(InfoProducer producer) { return this.producerCallbacks.healthCheck(producer) // .onErrorResume(throwable -> { handleNonRespondingProducer(throwable, producer); @@ -82,37 +80,33 @@ public class ProducerSupervision { })// .doOnNext(response -> handleRespondingProducer(response, producer)) .flatMap(response -> checkProducerJobs(producer)) // - .flatMap(responses -> Mono.just(producer)); + .map(responses -> producer); } - private Mono checkProducerJobs(EiProducer producer) { + private Mono checkProducerJobs(InfoProducer producer) { + final int MAX_CONCURRENCY = 10; return getEiJobs(producer) // - .filter(eiJob -> !producer.isJobEnabled(eiJob)) // - .flatMap(eiJob -> startEiJob(producer, eiJob), 1) // + .filter(infoJob -> !producer.isJobEnabled(infoJob)) // + .flatMap(infoJob -> producerCallbacks.startInfoJob(producer, infoJob, Retry.max(1)), MAX_CONCURRENCY) // .collectList() // - .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // + .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getInfoTypes())) // .collectList(); } - private Mono startEiJob(EiProducer producer, EiJob eiJob) { - Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); - return producerCallbacks.startEiJob(producer, eiJob, retrySpec); - } - - private Flux getEiJobs(EiProducer producer) { - return Flux.fromIterable(producer.getEiTypes()) // - .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType))); + private Flux getEiJobs(InfoProducer producer) { + return Flux.fromIterable(producer.getInfoTypes()) // + .flatMap(infoType -> Flux.fromIterable(infoJobs.getJobsForType(infoType))); } - private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) { + private void handleNonRespondingProducer(Throwable throwable, InfoProducer producer) { logger.warn("Unresponsive producer: {} exception: {}", producer.getId(), throwable.getMessage()); producer.setAliveStatus(false); if (producer.isDead()) { - this.eiProducers.deregisterProducer(producer); + this.infoProducers.deregisterProducer(producer); } } - private void handleRespondingProducer(String response, EiProducer producer) { + private void handleRespondingProducer(String response, InfoProducer producer) { logger.debug("{}", response); producer.setAliveStatus(true); }