From ee3e6557e9578cc3f01035b84b6f26ce917fe499 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 1 Nov 2021 12:02:55 +0100 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Added support for using HTTP Proxy for data delivery. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-630 Change-Id: Ic85f718b4b0f7e8ea343095f7234b6265eeb29dc --- dmaap-adaptor-java/README.md | 33 +++++++++++----------- .../config/application_configuration.json | 5 ++-- .../org/oran/dmaapadapter/repository/InfoType.java | 6 +++- .../dmaapadapter/tasks/DmaapMessageConsumer.java | 11 +++++--- .../resources/test_application_configuration.json | 3 +- 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md index d9826ad6..0378bc79 100644 --- a/dmaap-adaptor-java/README.md +++ b/dmaap-adaptor-java/README.md @@ -1,5 +1,4 @@ # O-RAN-SC Non-RealTime RIC DMaaP Information Producer - This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API. A data consumer may create information jobs through the ICS Data Producer API. @@ -8,33 +7,33 @@ This service will retrieve data from the DMaaP Message Router (MR) and distribut The component is a springboot service and is configured as any springboot service through the file `config/application.yaml`. The component log can be retrieved and logging can be controled by means of REST call. See the API documentation (api/api.yaml). -The file `config/application_configuration.json` contains the configuration of job types that the producer will support. +The file `config/application_configuration.json` contains the configuration of job types that the producer will support. Here follows an example with one type: +```sh { "types": [ { - "id": The ID of the job type, e.g. "STD_Fault_Messages", - "dmaapTopicUrl": The topic URL to poll from DMaaP Message Router, e.g. "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages" + "id": "STD_Fault_Messages", + "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", + "useHttpProxy": false } - ] + ] } +``` + +Each information has the following properties: + - id the information type identity as exposed in the Information Coordination Service data consumer API + - dmaapTopicUrl the URL to for fetching information from DMaaP + - useHttpProxy if true, the received information will be delivered using a HTTP proxy (provided that one is setup in the application.yaml file). This might for instance be needed if the data consumer is in the RAN or outside the cluster. The service producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer. ## License -Copyright (C) 2021 Nordix Foundation. -Licensed under the Apache License, Version 2.0 (the "License") -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 +Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License, Version 2.0 (the "License") you may not use this file except in compliance with the License. You may obtain a copy of the License at -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +http://www.apache.org/licenses/LICENSE-2.0 -For more information about license please see the [LICENSE](LICENSE.txt) file for details. +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. +For more information about license please see the [LICENSE](LICENSE.txt) file for details. \ No newline at end of file diff --git a/dmaap-adaptor-java/config/application_configuration.json b/dmaap-adaptor-java/config/application_configuration.json index a8967d8b..ae34c56f 100644 --- a/dmaap-adaptor-java/config/application_configuration.json +++ b/dmaap-adaptor-java/config/application_configuration.json @@ -2,7 +2,8 @@ "types": [ { "id": "ExampleInformationType", - "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12", + "useHttpProxy": true } ] -} +} \ No newline at end of file diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index d19577db..9dda1e61 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -30,9 +30,13 @@ public class InfoType { @Getter private final String dmaapTopicUrl; - public InfoType(String id, String dmaapTopicUrl) { + @Getter + private final boolean useHttpProxy; + + public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) { this.id = id; this.dmaapTopicUrl = dmaapTopicUrl; + this.useHttpProxy = useHttpProxy; } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index f82d7f68..b7c4ec66 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -43,7 +43,8 @@ public class DmaapMessageConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); private final ApplicationConfig applicationConfig; - private final AsyncRestClient restClient; + private final AsyncRestClient dmaapRestClient; + private final AsyncRestClient consumerRestClient; private final InfoType type; private final Jobs jobs; private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); @@ -82,7 +83,9 @@ public class DmaapMessageConsumer { public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { this.applicationConfig = applicationConfig; AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); - this.restClient = restclientFactory.createRestClientNoHttpProxy(""); + this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); + this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") + : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; this.jobs = jobs; } @@ -116,7 +119,7 @@ public class DmaapMessageConsumer { protected Mono getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); - return restClient.get(topicUrl) // + return dmaapRestClient.get(topicUrl) // .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // .onErrorResume(this::handleDmaapErrorResponse); // @@ -129,7 +132,7 @@ public class DmaapMessageConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) - .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json index 0db20cb3..8d211b89 100644 --- a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json +++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json @@ -2,7 +2,8 @@ "types": [ { "id": "ExampleInformationType", - "dmaapTopicUrl": "/dmaap-topic-1" + "dmaapTopicUrl": "/dmaap-topic-1", + "useHttpProxy": true } ] } \ No newline at end of file -- 2.16.6