import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collection;
-import java.util.Vector;
import org.oransc.enrichment.clients.AsyncRestClient;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiTypes;
+import org.oransc.enrichment.repository.EiProducers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Callbacks to the EiProducer
*/
-@Component
@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
public class ProducerCallbacks {
private static Gson gson = new GsonBuilder().create();
private final AsyncRestClient restClient;
- private final EiTypes eiTypes;
- @Autowired
- public ProducerCallbacks(ApplicationConfig config, EiTypes eiTypes) {
+ public ProducerCallbacks(ApplicationConfig config) {
AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
- this.restClient = restClientFactory.createRestClient("");
- this.eiTypes = eiTypes;
+ this.restClient = restClientFactory.createRestClientNoHttpProxy("");
}
- public void notifyProducersJobDeleted(EiJob eiJob) {
- for (EiProducer producer : getProducers(eiJob)) {
+ public Mono<String> healthCheck(EiProducer producer) {
+ return restClient.get(producer.getProducerSupervisionCallbackUrl());
+ }
+
+ public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
+ for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
+ producer.setJobDisabled(eiJob);
restClient.delete(url) //
- .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
+ .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
throwable.getMessage()),
null);
}
/**
- * Calls all producers for an EiJob activation.
+ * Start a job in all producers that suports the job type
*
* @param eiJob an EI job
* @return the number of producers that returned OK
*/
- public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+ public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return Flux.fromIterable(getProducers(eiJob)) //
- .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
+ return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
+ .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
}
/**
- * Restart all jobs for one producer
+ * Start all jobs for one producer
*
* @param producer
* @param eiJobs
*/
- public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+ public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
final int maxNoOfParalellRequests = 10;
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
- Flux.fromIterable(producer.getEiTypes()) //
+ return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
- .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
- .onErrorResume(t -> {
- logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
- return Flux.empty();
- }) //
- .subscribe();
+ .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
}
- private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+ public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
ProducerJobInfo request = new ProducerJobInfo(eiJob);
String body = gson.toJson(request);
.retryWhen(retrySpec) //
.doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
.onErrorResume(throwable -> {
+ producer.setJobDisabled(eiJob);
logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
return Mono.empty();
- });
+ }) //
+ .doOnNext(resp -> producer.setJobEnabled(eiJob));
}
- private Collection<EiProducer> getProducers(EiJob eiJob) {
- try {
- return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
- } catch (Exception e) {
- return new Vector<>();
- }
+ private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {
+ return eiProducers.getProducersForType(eiJob.getTypeId());
}
}