/*-
* ========================LICENSE_START=================================
- * ONAP : ccsdk oran
- * ======================================================================
- * Copyright (C) 2020 Nordix Foundation. All rights reserved.
- * ======================================================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
package org.oransc.enrichment.tasks;
-import org.oransc.enrichment.clients.AsyncRestClient;
+import java.time.Duration;
+
import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
+import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiProducers;
-import org.oransc.enrichment.repository.EiTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
/**
* Regularly checks the availability of the EI Producers
public class ProducerSupervision {
private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class);
- @Autowired
- ApplicationConfig applicationConfig;
-
- @Autowired
- EiProducers eiProducers;
-
- @Autowired
- EiJobs eiJobs;
+ private final EiProducers eiProducers;
+ private final EiJobs eiJobs;
+ private final ProducerCallbacks producerCallbacks;
+ private final ConsumerCallbacks consumerCallbacks;
@Autowired
- EiTypes eiTypes;
+ public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
+ ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) {
+ this.eiProducers = eiProducers;
+ this.eiJobs = eiJobs;
+ this.producerCallbacks = producerCallbacks;
+ this.consumerCallbacks = consumerCallbacks;
+ }
@Scheduled(fixedRate = 1000 * 60 * 5)
public void checkAllProducers() {
}
private Mono<EiProducer> checkOneProducer(EiProducer producer) {
- return restClient().get(producer.getProducerSupervisionCallbackUrl()) //
+ return this.producerCallbacks.healthCheck(producer) //
.onErrorResume(throwable -> {
handleNonRespondingProducer(throwable, producer);
return Mono.empty();
})//
.doOnNext(response -> handleRespondingProducer(response, producer))
- .flatMap(response -> Mono.just(producer));
+ .flatMap(response -> checkProducerJobs(producer)) //
+ .flatMap(responses -> Mono.just(producer));
+ }
+
+ private Mono<?> checkProducerJobs(EiProducer producer) {
+ return getEiJobs(producer) //
+ .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
+ .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+ .collectList() //
+ .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList();
+ }
+
+ private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
+ Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
+ return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
+ }
+
+ private Flux<EiJob> getEiJobs(EiProducer producer) {
+ return Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
}
private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) {
logger.warn("Unresponsive producer: {} exception: {}", producer.getId(), throwable.getMessage());
producer.setAliveStatus(false);
if (producer.isDead()) {
- this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
+ this.eiProducers.deregisterProducer(producer);
}
}
producer.setAliveStatus(true);
}
- private AsyncRestClient restClient() {
- return new AsyncRestClient("", this.applicationConfig.getWebClientConfig());
- }
-
}