From e372f940d2e57562d23e08ecb797f580800dc719 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 27 Oct 2021 09:20:52 +0200 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Fixed so that one registration is always done after a component restart. Discarding empty array from DMAAP Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: Iafac2e7a6dd11fe74a5e8c6c6214858a8ff4e6d3 --- .../org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java | 1 + .../org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java | 11 +++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index fb5c891c..1a260e92 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -101,6 +101,7 @@ public class DmaapMessageConsumer { final int CONCURRENCY = 5; return infiniteSubmitter.start() // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // + .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // .flatMap(this::handleReceivedMessage, CONCURRENCY); } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 837ca323..82167694 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -71,7 +71,7 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void supervisionTask() { checkRegistration() // - .filter(isRegisterred -> !isRegisterred) // + .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) // .flatMap(isRegisterred -> registerTypesAndProducer()) // .subscribe( // null, // @@ -80,13 +80,12 @@ public class ProducerRegstrationTask { } private void handleRegistrationCompleted() { - logger.debug("Registering types and producer succeeded"); + logger.debug("Registering types and producer completed"); isRegisteredInEcs = true; } private void handleRegistrationFailure(Throwable t) { - logger.warn("Registration failed {}", t.getMessage()); - isRegisteredInEcs = false; + logger.warn("Registration of producer failed {}", t.getMessage()); } private Mono checkRegistration() { @@ -112,8 +111,8 @@ public class ProducerRegstrationTask { private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; - final String producerUrl = - applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // -- 2.16.6