package org.oransc.enrichment.tasks;
-import java.time.Duration;
-
import org.oransc.enrichment.configuration.ApplicationConfig;
-import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
-import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
-import org.oransc.enrichment.repository.EiJob;
-import org.oransc.enrichment.repository.EiJobs;
-import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiProducers;
+import org.oransc.enrichment.controllers.a1e.A1eCallbacks;
+import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
+import org.oransc.enrichment.repository.InfoJob;
+import org.oransc.enrichment.repository.InfoJobs;
+import org.oransc.enrichment.repository.InfoProducer;
+import org.oransc.enrichment.repository.InfoProducers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.util.retry.Retry;
/**
- * Regularly checks the availability of the EI Producers
+ * Regularly checks the availability of the Info Producers
*/
@Component
@EnableScheduling
public class ProducerSupervision {
private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class);
- private final EiProducers eiProducers;
- private final EiJobs eiJobs;
+ private final InfoProducers infoProducers;
+ private final InfoJobs infoJobs;
private final ProducerCallbacks producerCallbacks;
- private final ConsumerCallbacks consumerCallbacks;
+ private final A1eCallbacks consumerCallbacks;
@Autowired
- public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
- ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) {
- this.eiProducers = eiProducers;
- this.eiJobs = eiJobs;
+ public ProducerSupervision(ApplicationConfig applicationConfig, InfoProducers infoProducers, InfoJobs infoJobs,
+ ProducerCallbacks producerCallbacks, A1eCallbacks consumerCallbacks) {
+ this.infoProducers = infoProducers;
+ this.infoJobs = infoJobs;
this.producerCallbacks = producerCallbacks;
this.consumerCallbacks = consumerCallbacks;
}
createTask().subscribe(null, null, () -> logger.debug("Checking all Producers completed"));
}
- public Flux<EiProducer> createTask() {
- return Flux.fromIterable(eiProducers.getAllProducers()) //
+ public Flux<InfoProducer> createTask() {
+ return Flux.fromIterable(infoProducers.getAllProducers()) //
.flatMap(this::checkOneProducer);
}
- private Mono<EiProducer> checkOneProducer(EiProducer producer) {
+ private Mono<InfoProducer> checkOneProducer(InfoProducer producer) {
return this.producerCallbacks.healthCheck(producer) //
.onErrorResume(throwable -> {
handleNonRespondingProducer(throwable, producer);
.flatMap(responses -> Mono.just(producer));
}
- private Mono<?> checkProducerJobs(EiProducer producer) {
+ private Mono<?> checkProducerJobs(InfoProducer producer) {
+ final int MAX_CONCURRENCY = 10;
return getEiJobs(producer) //
- .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
- .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+ .filter(infoJob -> !producer.isJobEnabled(infoJob)) //
+ .flatMap(infoJob -> producerCallbacks.startInfoJob(producer, infoJob, Retry.max(1)), MAX_CONCURRENCY) //
.collectList() //
- .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .flatMapMany(startedJobs -> consumerCallbacks.notifyJobStatus(producer.getInfoTypes())) //
.collectList();
}
- private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
- Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
- }
-
- private Flux<EiJob> getEiJobs(EiProducer producer) {
- return Flux.fromIterable(producer.getEiTypes()) //
- .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
+ private Flux<InfoJob> getEiJobs(InfoProducer producer) {
+ return Flux.fromIterable(producer.getInfoTypes()) //
+ .flatMap(infoType -> Flux.fromIterable(infoJobs.getJobsForType(infoType)));
}
- private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) {
+ private void handleNonRespondingProducer(Throwable throwable, InfoProducer producer) {
logger.warn("Unresponsive producer: {} exception: {}", producer.getId(), throwable.getMessage());
producer.setAliveStatus(false);
if (producer.isDead()) {
- this.eiProducers.deregisterProducer(producer);
+ this.infoProducers.deregisterProducer(producer);
}
}
- private void handleRespondingProducer(String response, EiProducer producer) {
+ private void handleRespondingProducer(String response, InfoProducer producer) {
logger.debug("{}", response);
producer.setAliveStatus(true);
}