NONRTRIC - Implement DMaaP mediator producer service in Java 67/6967/3
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 28 Oct 2021 12:21:36 +0000 (14:21 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 29 Oct 2021 09:52:39 +0000 (11:52 +0200)
Added README.md

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-621
Change-Id: I206447cd73e95650619c52ea1cb914872d35ea28

dmaap-adaptor-java/README.md [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java

diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md
new file mode 100644 (file)
index 0000000..d9826ad
--- /dev/null
@@ -0,0 +1,40 @@
+# O-RAN-SC Non-RealTime RIC DMaaP Information Producer
+
+This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API.
+
+A data consumer may create information jobs through the ICS Data Producer API.
+
+This service will retrieve data from the DMaaP Message Router (MR) and distribute it further to the data consumers (information job owners).
+
+The component is a springboot service and is configured as any springboot service through the file `config/application.yaml`. The component log can be retrieved and logging can be controled by means of REST call. See the API documentation (api/api.yaml).
+
+The file `config/application_configuration.json` contains the configuration of job types that the producer will support.
+
+    {
+       "types":
+        [
+          {
+            "id": The ID of the job type, e.g. "STD_Fault_Messages",
+            "dmaapTopicUrl": The topic URL to poll from DMaaP Message Router, e.g. "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+          }
+      ]
+    }
+
+The service producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+
+## License
+
+Copyright (C) 2021 Nordix Foundation.
+Licensed under the Apache License, Version 2.0 (the "License")
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+For more information about license please see the [LICENSE](LICENSE.txt) file for details.
index 1a260e9..f82d7f6 100644 (file)
@@ -88,38 +88,38 @@ public class DmaapMessageConsumer {
     }
 
     public void start() {
-        infiniteSubmitter.stop();
-
-        createTask().subscribe(//
-                value -> logger.debug("DmaapMessageConsumer next: {}", value), //
-                throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                () -> logger.warn("DmaapMessageConsumer stopped") //
-        );
-    }
-
-    protected Flux<String> createTask() {
-        final int CONCURRENCY = 5;
-        return infiniteSubmitter.start() //
+        infiniteSubmitter.start() //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
-                .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
-                .flatMap(this::handleReceivedMessage, CONCURRENCY);
+                .flatMap(this::handleReceivedMessage, 5) //
+                .subscribe(//
+                        value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+                        throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
+                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
+                );
     }
 
     private String getDmaapUrl() {
+
         return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
     }
 
-    private Mono<String> handleErrorResponse(Throwable t) {
-        logger.debug("error from DMAAP {}", t.getMessage());
+    private Mono<String> handleDmaapErrorResponse(Throwable t) {
+        logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
         return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
                 .flatMap(notUsed -> Mono.empty());
     }
 
+    private Mono<String> handleConsumerErrorResponse(Throwable t) {
+        logger.warn("error from CONSUMER {}", t.getMessage());
+        return Mono.empty();
+    }
+
     protected Mono<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return restClient.get(topicUrl) //
-                .onErrorResume(this::handleErrorResponse);
+                .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
+                .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+                .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
     protected Flux<String> handleReceivedMessage(String body) {
@@ -130,7 +130,7 @@ public class DmaapMessageConsumer {
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
                 .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
-                .onErrorResume(this::handleErrorResponse);
+                .onErrorResume(this::handleConsumerErrorResponse);
     }
 
 }
index 8216769..4a68ab0 100644 (file)
@@ -111,8 +111,8 @@ public class ProducerRegstrationTask {
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
-        final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
-                + PRODUCER_ID;
+        final String producerUrl =
+                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
index b1c1780..1ca4fac 100644 (file)
@@ -49,7 +49,6 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -80,9 +79,6 @@ class ApplicationTest {
     @Autowired
     private ApplicationConfig applicationConfig;
 
-    @Autowired
-    private ProducerRegstrationTask producerRegstrationTask;
-
     @Autowired
     private Jobs jobs;