Merge "Added support for using oauth token for Kafka"
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / ProducerRegstrationTask.java
index 14789f4..d4b29be 100644 (file)
@@ -82,16 +82,13 @@ public class ProducerRegstrationTask {
 
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
     public void runSupervisionTask() {
-        supervisionTask().subscribe( //
+        if (this.isRegisteredInIcs) {
+            return;
+        }
+        registerTypesAndProducer().subscribe( //
                 null, //
-                this::handleRegistrationFailure, //
-                this::handleRegistrationCompleted);
-    }
-
-    public Mono<String> supervisionTask() {
-        return checkRegistration() //
-                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
-                .flatMap(isRegisterred -> registerTypesAndProducer());
+                this::handleRegistrationFailure//
+        );
     }
 
     private void handleRegistrationCompleted() {
@@ -107,28 +104,11 @@ public class ProducerRegstrationTask {
         return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
     }
 
-    // Returns TRUE if registration is correct
-    private Mono<Boolean> checkRegistration() {
-        return restClient.get(producerRegistrationUrl()) //
-                .flatMap(this::isRegisterredInfoCorrect) //
-                .onErrorResume(t -> Mono.just(Boolean.FALSE));
-    }
-
-    private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
-        ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
-        if (isEqual(producerRegistrationInfo(), registerredInfo)) {
-            logger.trace("Already registered in ICS");
-            return Mono.just(Boolean.TRUE);
-        } else {
-            return Mono.just(Boolean.FALSE);
-        }
-    }
-
     private String registerTypeUrl(InfoType type) {
         return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
     }
 
-    private Mono<String> registerTypesAndProducer() {
+    public Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 1;
 
         return Flux.fromIterable(this.types.getAll()) //
@@ -138,7 +118,8 @@ public class ProducerRegstrationTask {
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
-                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
+                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())))
+                .doOnNext(n -> handleRegistrationCompleted());
     }
 
     private Mono<InfoType> createInputDataJob(InfoType type) {
@@ -155,7 +136,6 @@ public class ProducerRegstrationTask {
         return restClient.put(consumerJobUrl(JOB_ID), body)
                 .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
                         t.getMessage()))
-                .onErrorResume(t -> Mono.just("")) //
                 .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
                 .map(x -> type);
     }