Changed so that registration never gives up 52/10752/1
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 21 Mar 2023 09:46:52 +0000 (10:46 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 21 Mar 2023 09:46:52 +0000 (10:46 +0100)
This includes registration of types, registration of producer and creation of the input job.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-853
Change-Id: I3f3cafaa73c23100d6c0c7602be75e3b0f6f45d9

pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java
pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java

index 4bd95a1..5b10241 100644 (file)
@@ -82,16 +82,13 @@ public class ProducerRegstrationTask {
 
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
     public void runSupervisionTask() {
-        supervisionTask().subscribe( //
+        if (this.isRegisteredInIcs) {
+            return;
+        }
+        registerTypesAndProducer().subscribe( //
                 null, //
-                this::handleRegistrationFailure, //
-                this::handleRegistrationCompleted);
-    }
-
-    public Mono<String> supervisionTask() {
-        return checkRegistration() //
-                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
-                .flatMap(isRegisterred -> registerTypesAndProducer());
+                this::handleRegistrationFailure//
+        );
     }
 
     private void handleRegistrationCompleted() {
@@ -107,28 +104,11 @@ public class ProducerRegstrationTask {
         return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
     }
 
-    // Returns TRUE if registration is correct
-    private Mono<Boolean> checkRegistration() {
-        return restClient.get(producerRegistrationUrl()) //
-                .flatMap(this::isRegisterredInfoCorrect) //
-                .onErrorResume(t -> Mono.just(Boolean.FALSE));
-    }
-
-    private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
-        ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
-        if (isEqual(producerRegistrationInfo(), registerredInfo)) {
-            logger.trace("Already registered in ICS");
-            return Mono.just(Boolean.TRUE);
-        } else {
-            return Mono.just(Boolean.FALSE);
-        }
-    }
-
     private String registerTypeUrl(InfoType type) {
         return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
     }
 
-    private Mono<String> registerTypesAndProducer() {
+    public Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 1;
 
         return Flux.fromIterable(this.types.getAll()) //
@@ -138,7 +118,8 @@ public class ProducerRegstrationTask {
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
-                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
+                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())))
+                .doOnNext(n -> handleRegistrationCompleted());
     }
 
     private Mono<InfoType> createInputDataJob(InfoType type) {
@@ -155,7 +136,6 @@ public class ProducerRegstrationTask {
         return restClient.put(consumerJobUrl(JOB_ID), body)
                 .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
                         t.getMessage()))
-                .onErrorResume(t -> Mono.just("")) //
                 .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
                 .map(x -> type);
     }
index 8cba2ca..dd38ffc 100644 (file)
@@ -257,9 +257,10 @@ class ApplicationTest {
     }
 
     private void waitForRegistration() {
+        producerRegistrationTask.registerTypesAndProducer().block();
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        producerRegistrationTask.supervisionTask().block();
+        producerRegistrationTask.registerTypesAndProducer().block();
 
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
         assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
@@ -429,23 +430,6 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
     }
 
-    @Test
-    void testReRegister() throws Exception {
-        // Wait foir register types and producer
-        waitForRegistration();
-
-        // Clear the registration, should trigger a re-register
-        icsSimulatorController.testResults.reset();
-        producerRegistrationTask.supervisionTask().block();
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
-        // Just clear the registerred types, should trigger a re-register
-        icsSimulatorController.testResults.types.clear();
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds)
-                .hasSize(this.types.size()));
-    }
-
     @Test
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     void testZZActuator() throws Exception {