X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Fclients%2FProducerCallbacks.java;fp=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Fclients%2FProducerCallbacks.java;h=f42c6e34ac42605938866cb2bfc0583757e1f583;hb=6a1eb6e2a6538decc54f5348fcb1589f5b829e68;hp=07b188d96b1953bd3025207378c0de974f6f84ed;hpb=0f8b3b162b7ab08cdfc998979cfa9866634893a6;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java index 07b188d9..f42c6e34 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java @@ -34,6 +34,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Callbacks to the EiProducer */ @@ -48,12 +51,6 @@ public class ProducerCallbacks { @Autowired ApplicationConfig applicationConfig; - public void notifyProducersJobCreated(EiJob eiJob) { - for (EiProducer producer : eiJob.type().getProducers()) { - notifyProducerJobStarted(producer, eiJob); - } - } - public void notifyProducersJobDeleted(EiJob eiJob) { AsyncRestClient restClient = restClient(false); ProducerJobInfo request = new ProducerJobInfo(eiJob); @@ -65,15 +62,24 @@ public class ProducerCallbacks { } } - public void notifyProducerJobStarted(EiProducer producer, EiJob eiJob) { + public Mono notifyProducersJobStarted(EiJob eiJob) { + return Flux.fromIterable(eiJob.type().getProducers()) // + .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) // + .collectList() // + .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); // + } + + public Mono notifyProducerJobStarted(EiProducer producer, EiJob eiJob) { AsyncRestClient restClient = restClient(false); ProducerJobInfo request = new ProducerJobInfo(eiJob); String body = gson.toJson(request); - restClient.post(producer.jobCreationCallbackUrl(), body) // - .subscribe(notUsed -> logger.debug("Job subscription started OK {}", producer.id()), // - throwable -> logger.warn("Job subscription failed {}", producer.id(), throwable.toString()), null); - + return restClient.post(producer.jobCreationCallbackUrl(), body) + .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.id())) + .onErrorResume(throwable -> { + logger.warn("Job subscription failed {}", producer.id(), throwable.toString()); + return Mono.empty(); + }); } private AsyncRestClient restClient(boolean useTrustValidation) {