X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FProducerRegstrationTask.java;h=d4b29be84f8df88d99e1b9daf6c54ea02114b9e5;hb=fbccee5729fb23f3424046c1d122d29f0fec545a;hp=4bd95a1697f4fb516f26c328ed1f74f6818b3853;hpb=6dfbff6834c3a9da2d8f06b15eb94048cbad2d88;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java index 4bd95a1..d4b29be 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java @@ -32,10 +32,10 @@ import lombok.Getter; import org.oran.pmproducer.clients.AsyncRestClient; import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.controllers.ProducerCallbacksController; import org.oran.pmproducer.exceptions.ServiceException; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.r1.ConsumerJobInfo; import org.oran.pmproducer.r1.ProducerInfoTypeInfo; import org.oran.pmproducer.r1.ProducerRegistrationInfo; @@ -82,16 +82,13 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void runSupervisionTask() { - supervisionTask().subscribe( // + if (this.isRegisteredInIcs) { + return; + } + registerTypesAndProducer().subscribe( // null, // - this::handleRegistrationFailure, // - this::handleRegistrationCompleted); - } - - public Mono supervisionTask() { - return checkRegistration() // - .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) // - .flatMap(isRegisterred -> registerTypesAndProducer()); + this::handleRegistrationFailure// + ); } private void handleRegistrationCompleted() { @@ -107,28 +104,11 @@ public class ProducerRegstrationTask { return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId; } - // Returns TRUE if registration is correct - private Mono checkRegistration() { - return restClient.get(producerRegistrationUrl()) // - .flatMap(this::isRegisterredInfoCorrect) // - .onErrorResume(t -> Mono.just(Boolean.FALSE)); - } - - private Mono isRegisterredInfoCorrect(String registerredInfoStr) { - ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); - if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered in ICS"); - return Mono.just(Boolean.TRUE); - } else { - return Mono.just(Boolean.FALSE); - } - } - private String registerTypeUrl(InfoType type) { return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } - private Mono registerTypesAndProducer() { + public Mono registerTypesAndProducer() { final int CONCURRENCY = 1; return Flux.fromIterable(this.types.getAll()) // @@ -138,7 +118,8 @@ public class ProducerRegstrationTask { CONCURRENCY) // .collectList() // .doOnNext(type -> logger.info("Registering producer")) // - .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo()))); + .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo()))) + .doOnNext(n -> handleRegistrationCompleted()); } private Mono createInputDataJob(InfoType type) { @@ -155,7 +136,6 @@ public class ProducerRegstrationTask { return restClient.put(consumerJobUrl(JOB_ID), body) .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(), t.getMessage())) - .onErrorResume(t -> Mono.just("")) // .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) // .map(x -> type); }