From 022b9ca1a9b53e83e27379c8a4785fb6e1683e89 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Thu, 28 Oct 2021 14:21:36 +0200 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Added README.md Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-621 Change-Id: I206447cd73e95650619c52ea1cb914872d35ea28 --- dmaap-adaptor-java/README.md | 40 ++++++++++++++++++++++ .../dmaapadapter/tasks/DmaapMessageConsumer.java | 38 ++++++++++---------- .../tasks/ProducerRegstrationTask.java | 4 +-- .../org/oran/dmaapadapter/ApplicationTest.java | 4 --- 4 files changed, 61 insertions(+), 25 deletions(-) create mode 100644 dmaap-adaptor-java/README.md diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md new file mode 100644 index 00000000..d9826ad6 --- /dev/null +++ b/dmaap-adaptor-java/README.md @@ -0,0 +1,40 @@ +# O-RAN-SC Non-RealTime RIC DMaaP Information Producer + +This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API. + +A data consumer may create information jobs through the ICS Data Producer API. + +This service will retrieve data from the DMaaP Message Router (MR) and distribute it further to the data consumers (information job owners). + +The component is a springboot service and is configured as any springboot service through the file `config/application.yaml`. The component log can be retrieved and logging can be controled by means of REST call. See the API documentation (api/api.yaml). + +The file `config/application_configuration.json` contains the configuration of job types that the producer will support. + + { + "types": + [ + { + "id": The ID of the job type, e.g. "STD_Fault_Messages", + "dmaapTopicUrl": The topic URL to poll from DMaaP Message Router, e.g. "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages" + } + ] + } + +The service producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer. + +## License + +Copyright (C) 2021 Nordix Foundation. +Licensed under the Apache License, Version 2.0 (the "License") +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +For more information about license please see the [LICENSE](LICENSE.txt) file for details. diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index 1a260e92..f82d7f68 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -88,38 +88,38 @@ public class DmaapMessageConsumer { } public void start() { - infiniteSubmitter.stop(); - - createTask().subscribe(// - value -> logger.debug("DmaapMessageConsumer next: {}", value), // - throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped") // - ); - } - - protected Flux createTask() { - final int CONCURRENCY = 5; - return infiniteSubmitter.start() // + infiniteSubmitter.start() // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // - .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. - .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // - .flatMap(this::handleReceivedMessage, CONCURRENCY); + .flatMap(this::handleReceivedMessage, 5) // + .subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) // + ); } private String getDmaapUrl() { + return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl(); } - private Mono handleErrorResponse(Throwable t) { - logger.debug("error from DMAAP {}", t.getMessage()); + private Mono handleDmaapErrorResponse(Throwable t) { + logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl()); return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // .flatMap(notUsed -> Mono.empty()); } + private Mono handleConsumerErrorResponse(Throwable t) { + logger.warn("error from CONSUMER {}", t.getMessage()); + return Mono.empty(); + } + protected Mono getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); return restClient.get(topicUrl) // - .onErrorResume(this::handleErrorResponse); + .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. + .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // + .onErrorResume(this::handleDmaapErrorResponse); // } protected Flux handleReceivedMessage(String body) { @@ -130,7 +130,7 @@ public class DmaapMessageConsumer { return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // - .onErrorResume(this::handleErrorResponse); + .onErrorResume(this::handleConsumerErrorResponse); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 82167694..4a68ab0e 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -111,8 +111,8 @@ public class ProducerRegstrationTask { private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; - final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" - + PRODUCER_ID; + final String producerUrl = + applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index b1c1780a..1ca4fac2 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -49,7 +49,6 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -80,9 +79,6 @@ class ApplicationTest { @Autowired private ApplicationConfig applicationConfig; - @Autowired - private ProducerRegstrationTask producerRegstrationTask; - @Autowired private Jobs jobs; -- 2.16.6