X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Fcontrollers%2Fconsumer%2FConsumerCallbacks.java;h=7ccf61d0eeb78532a871229add4065d720270ec9;hb=e912ee4367d6a305ac038c86dec816b5ce71191b;hp=cded953562d0a96d48c3828e2934b093cebece2b;hpb=0b0cb1fdcf166e438c6932f1eb0f29dafafbe635;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java index cded9535..7ccf61d0 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java @@ -24,20 +24,23 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import java.lang.invoke.MethodHandles; +import java.util.Collection; import org.oransc.enrichment.clients.AsyncRestClient; import org.oransc.enrichment.clients.AsyncRestClientFactory; import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.repository.EiJob; import org.oransc.enrichment.repository.EiJobs; -import org.oransc.enrichment.repository.EiProducer; +import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiType; -import org.oransc.enrichment.repository.EiTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Callbacks to the EiProducer */ @@ -49,41 +52,39 @@ public class ConsumerCallbacks { private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient restClient; - private final EiTypes eiTypes; private final EiJobs eiJobs; + private final EiProducers eiProducers; @Autowired - public ConsumerCallbacks(ApplicationConfig config, EiTypes eiTypes, EiJobs eiJobs) { + public ConsumerCallbacks(ApplicationConfig config, EiJobs eiJobs, EiProducers eiProducers) { AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig()); - this.restClient = restClientFactory.createRestClient(""); - this.eiTypes = eiTypes; + this.restClient = restClientFactory.createRestClientUseHttpProxy(""); this.eiJobs = eiJobs; + this.eiProducers = eiProducers; } - public void notifyConsumersProducerDeleted(EiProducer eiProducer) { - for (EiType type : eiProducer.getEiTypes()) { - if (this.eiTypes.get(type.getId()) == null) { - for (EiJob job : this.eiJobs.getJobsForType(type)) { - noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED)); - } - } - } + public Flux notifyJobStatus(Collection eiTypes) { + return Flux.fromIterable(eiTypes) // + .flatMap(eiType -> Flux.fromIterable(this.eiJobs.getJobsForType(eiType))) // + .filter(eiJob -> !eiJob.getJobStatusUrl().isEmpty()) // + .filter(eiJob -> this.eiProducers.isJobEnabled(eiJob) != eiJob.isLastStatusReportedEnabled()) + .flatMap(this::noifyStatusToJobOwner); } - public void notifyConsumersTypeAdded(EiType eiType) { - for (EiJob job : this.eiJobs.getJobsForType(eiType)) { - noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED)); - } - } + private Mono noifyStatusToJobOwner(EiJob job) { + boolean isJobEnabled = this.eiProducers.isJobEnabled(job); + ConsumerEiJobStatus status = + isJobEnabled ? new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED) + : new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); + String body = gson.toJson(status); + return this.restClient.post(job.getJobStatusUrl(), body) // + .doOnNext(response -> logger.debug("Consumer notified OK {}", job.getId())) // + .doOnNext(response -> job.setLastReportedStatus(isJobEnabled)) // + .onErrorResume(throwable -> { + logger.warn("Consumer notify failed {} {}", job.getJobStatusUrl(), throwable.toString()); + return Mono.empty(); + }); - private void noifyJobOwner(EiJob job, ConsumerEiJobStatus status) { - if (!job.jobStatusUrl().isEmpty()) { - String body = gson.toJson(status); - this.restClient.post(job.jobStatusUrl(), body) // - .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.id()), // - throwable -> logger.warn("Consumer notify failed {} {}", job.jobStatusUrl(), throwable.toString()), // - null); - } } }