X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FProducerRegstrationTask.java;h=d4b29be84f8df88d99e1b9daf6c54ea02114b9e5;hb=fbccee5729fb23f3424046c1d122d29f0fec545a;hp=14789f4838838377d46edc1518c0bc6af1ac7f34;hpb=298969556b0f84de745a67e994a590d8b2a3de13;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java index 14789f4..d4b29be 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java @@ -82,16 +82,13 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void runSupervisionTask() { - supervisionTask().subscribe( // + if (this.isRegisteredInIcs) { + return; + } + registerTypesAndProducer().subscribe( // null, // - this::handleRegistrationFailure, // - this::handleRegistrationCompleted); - } - - public Mono supervisionTask() { - return checkRegistration() // - .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) // - .flatMap(isRegisterred -> registerTypesAndProducer()); + this::handleRegistrationFailure// + ); } private void handleRegistrationCompleted() { @@ -107,28 +104,11 @@ public class ProducerRegstrationTask { return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId; } - // Returns TRUE if registration is correct - private Mono checkRegistration() { - return restClient.get(producerRegistrationUrl()) // - .flatMap(this::isRegisterredInfoCorrect) // - .onErrorResume(t -> Mono.just(Boolean.FALSE)); - } - - private Mono isRegisterredInfoCorrect(String registerredInfoStr) { - ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); - if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered in ICS"); - return Mono.just(Boolean.TRUE); - } else { - return Mono.just(Boolean.FALSE); - } - } - private String registerTypeUrl(InfoType type) { return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } - private Mono registerTypesAndProducer() { + public Mono registerTypesAndProducer() { final int CONCURRENCY = 1; return Flux.fromIterable(this.types.getAll()) // @@ -138,7 +118,8 @@ public class ProducerRegstrationTask { CONCURRENCY) // .collectList() // .doOnNext(type -> logger.info("Registering producer")) // - .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo()))); + .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo()))) + .doOnNext(n -> handleRegistrationCompleted()); } private Mono createInputDataJob(InfoType type) { @@ -155,7 +136,6 @@ public class ProducerRegstrationTask { return restClient.put(consumerJobUrl(JOB_ID), body) .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(), t.getMessage())) - .onErrorResume(t -> Mono.just("")) // .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) // .map(x -> type); }