package org.oransc.enrichment.tasks;
-import org.oransc.enrichment.clients.AsyncRestClient;
-import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.a1e.A1eCallbacks;
+import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
+import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiProducers;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
/**
* Regularly checks the availability of the EI Producers
private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class);
private final EiProducers eiProducers;
- private final AsyncRestClient restClient;
+ private final EiJobs eiJobs;
+ private final ProducerCallbacks producerCallbacks;
+ private final A1eCallbacks consumerCallbacks;
@Autowired
- public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers) {
- AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.restClient = restClientFactory.createRestClientNoHttpProxy("");
+ public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
+ ProducerCallbacks producerCallbacks, A1eCallbacks consumerCallbacks) {
this.eiProducers = eiProducers;
+ this.eiJobs = eiJobs;
+ this.producerCallbacks = producerCallbacks;
+ this.consumerCallbacks = consumerCallbacks;
}
@Scheduled(fixedRate = 1000 * 60 * 5)
}
private Mono<EiProducer> checkOneProducer(EiProducer producer) {
- return restClient.get(producer.getProducerSupervisionCallbackUrl()) //
+ return this.producerCallbacks.healthCheck(producer) //
.onErrorResume(throwable -> {
handleNonRespondingProducer(throwable, producer);
return Mono.empty();
})//
.doOnNext(response -> handleRespondingProducer(response, producer))
- .flatMap(response -> Mono.just(producer));
+ .flatMap(response -> checkProducerJobs(producer)) //
+ .flatMap(responses -> Mono.just(producer));
+ }
+
+ private Mono<?> checkProducerJobs(EiProducer producer) {
+ final int MAX_CONCURRENCY = 10;
+ return getEiJobs(producer) //
+ .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
+ .flatMap(eiJob -> producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY) //
+ .collectList() //
+ .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList();
+ }
+
+ private Flux<EiJob> getEiJobs(EiProducer producer) {
+ return Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
}
private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) {