NONRTRIC - Implement DMaaP mediator producer service in Java 44/6944/4
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 27 Oct 2021 07:20:52 +0000 (09:20 +0200)
committerPatrik Buhr <patrik.buhr@est.tech>
Wed, 27 Oct 2021 14:21:01 +0000 (14:21 +0000)
Fixed so that one registration is always done after a component restart.
Discarding empty array from DMAAP

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Iafac2e7a6dd11fe74a5e8c6c6214858a8ff4e6d3

dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java

index fb5c891..1a260e9 100644 (file)
@@ -101,6 +101,7 @@ public class DmaapMessageConsumer {
         final int CONCURRENCY = 5;
         return infiniteSubmitter.start() //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
+                .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
                 .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
                 .flatMap(this::handleReceivedMessage, CONCURRENCY);
     }
index 837ca32..8216769 100644 (file)
@@ -71,7 +71,7 @@ public class ProducerRegstrationTask {
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
     public void supervisionTask() {
         checkRegistration() //
-                .filter(isRegisterred -> !isRegisterred) //
+                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
                 .flatMap(isRegisterred -> registerTypesAndProducer()) //
                 .subscribe( //
                         null, //
@@ -80,13 +80,12 @@ public class ProducerRegstrationTask {
     }
 
     private void handleRegistrationCompleted() {
-        logger.debug("Registering types and producer succeeded");
+        logger.debug("Registering types and producer completed");
         isRegisteredInEcs = true;
     }
 
     private void handleRegistrationFailure(Throwable t) {
-        logger.warn("Registration failed {}", t.getMessage());
-        isRegisteredInEcs = false;
+        logger.warn("Registration of producer failed {}", t.getMessage());
     }
 
     private Mono<Boolean> checkRegistration() {
@@ -112,8 +111,8 @@ public class ProducerRegstrationTask {
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
-        final String producerUrl =
-                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+        final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+                + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //