From b2d6339441c650962e34502e7527ca0835fa342f Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 5 Nov 2021 14:56:19 +0100 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Fixed so that an information type can receive data from a Kafka stream. This can also filter the data (using regexp matchning). The received data can be buffered to minimize the number of REST calls to deliver the data to the consumer. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: Ie3740898bd919908a7ec5753f7d6050c652cebe4 --- dmaap-adaptor-java/README.md | 2 +- dmaap-adaptor-java/config/application.yaml | 5 +- dmaap-adaptor-java/pom.xml | 10 + .../java/org/oran/dmaapadapter/BeanFactory.java | 22 +- .../configuration/ApplicationConfig.java | 4 + .../controllers/ProducerCallbacksController.java | 7 +- .../org/oran/dmaapadapter/repository/InfoType.java | 16 +- .../oran/dmaapadapter/repository/InfoTypes.java | 1 - .../java/org/oran/dmaapadapter/repository/Job.java | 58 ++++- .../org/oran/dmaapadapter/repository/Jobs.java | 11 +- ...essageConsumer.java => DmaapTopicConsumer.java} | 48 ++-- .../dmaapadapter/tasks/KafkaTopicConsumer.java | 130 +++++++++++ .../dmaapadapter/tasks/KafkaTopicConsumers.java | 79 +++++++ .../tasks/ProducerRegstrationTask.java | 54 +++-- .../src/main/resources/typeSchemaKafka.json | 26 +++ .../oran/dmaapadapter/EcsSimulatorController.java | 1 - .../org/oran/dmaapadapter/IntegrationWithEcs.java | 1 + .../oran/dmaapadapter/IntegrationWithKafka.java | 257 +++++++++++++++++++++ .../resources/test_application_configuration.json | 2 +- .../test_application_configuration_kafka.json | 9 + 20 files changed, 689 insertions(+), 54 deletions(-) rename dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/{DmaapMessageConsumer.java => DmaapTopicConsumer.java} (85%) create mode 100644 dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java create mode 100644 dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java create mode 100644 dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json create mode 100644 dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java create mode 100644 dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md index 0378bc79..9b35fe54 100644 --- a/dmaap-adaptor-java/README.md +++ b/dmaap-adaptor-java/README.md @@ -15,7 +15,7 @@ The file `config/application_configuration.json` contains the configuration of j [ { "id": "STD_Fault_Messages", - "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", + "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0", "useHttpProxy": false } ] diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml index 5733ea77..6a2d68a2 100644 --- a/dmaap-adaptor-java/config/application.yaml +++ b/dmaap-adaptor-java/config/application.yaml @@ -51,6 +51,9 @@ app: # configuration from the Consul will override the file. configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json dmaap-base-url: http://dradmin:dradmin@localhost:2222 - # The url used to adress this component. This is used as a callback url sent to other components. + # The url used to adress this component. This is used as a callback url sent to other components. dmaap-adapter-base-url: https://localhost:8435 + # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic + kafka: + bootstrap-servers: localhost:9092 diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml index 1fbd83c3..411b27c5 100644 --- a/dmaap-adaptor-java/pom.xml +++ b/dmaap-adaptor-java/pom.xml @@ -205,6 +205,16 @@ mockwebserver test + + io.projectreactor.kafka + reactor-kafka + 1.3.7 + + + com.google.guava + guava + 31.0.1-jre + diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java index c9ba93fc..faf57426 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java @@ -27,7 +27,8 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.DmaapMessageConsumer; +import org.oran.dmaapadapter.tasks.DmaapTopicConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; @@ -37,6 +38,7 @@ import org.springframework.context.annotation.Configuration; @Configuration public class BeanFactory { + private InfoTypes infoTypes; @Value("${server.http-port}") private int httpPort = 0; @@ -47,16 +49,24 @@ public class BeanFactory { } @Bean - public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) { + public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs, + @Autowired KafkaTopicConsumers kafkaConsumers) { + if (infoTypes != null) { + return infoTypes; + } + Collection types = appConfig.getTypes(); // Start a consumer for each type for (InfoType type : types) { - DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs); - topicConsumer.start(); + if (type.isDmaapTopicDefined()) { + DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs); + topicConsumer.start(); + } } - - return new InfoTypes(types); + infoTypes = new InfoTypes(types); + kafkaConsumers.start(infoTypes); + return infoTypes; } @Bean diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index e26fd46f..f17a9c03 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -88,6 +88,10 @@ public class ApplicationConfig { @Value("${app.dmaap-base-url}") private String dmaapBaseUrl; + @Getter + @Value("${app.kafka.bootstrap-servers:}") + private String kafkaBootStrapServers; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index ca7c96cd..e4dca5b8 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -85,7 +85,7 @@ public class ProducerCallbacksController { logger.info("Job started callback {}", request.id); Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner, - request.lastUpdated); + request.lastUpdated, toJobParameters(request.jobData)); this.jobs.put(job); return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { @@ -93,6 +93,11 @@ public class ProducerCallbacksController { } } + private Job.Parameters toJobParameters(Object jobData) { + String json = gson.toJson(jobData); + return gson.fromJson(json, Job.Parameters.class); + } + @GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting") @ApiResponse(responseCode = "200", // diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index 9dda1e61..27b527d2 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -22,6 +22,8 @@ package org.oran.dmaapadapter.repository; import lombok.Getter; +import org.springframework.util.StringUtils; + public class InfoType { @Getter @@ -33,10 +35,22 @@ public class InfoType { @Getter private final boolean useHttpProxy; - public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) { + @Getter + private final String kafkaInputTopic; + + public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) { this.id = id; this.dmaapTopicUrl = dmaapTopicUrl; this.useHttpProxy = useHttpProxy; + this.kafkaInputTopic = kafkaInputTopic; + } + + public boolean isKafkaTopicDefined() { + return StringUtils.hasLength(kafkaInputTopic); + } + + public boolean isDmaapTopicDefined() { + return StringUtils.hasLength(dmaapTopicUrl); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java index b8677a37..558fc465 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java @@ -35,7 +35,6 @@ public class InfoTypes { private Map allTypes = new HashMap<>(); public InfoTypes(Collection types) { - for (InfoType type : types) { put(type); } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java index 0da94a62..d1697e96 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -20,10 +20,42 @@ package org.oran.dmaapadapter.repository; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import lombok.Getter; +import org.immutables.gson.Gson; + public class Job { + @Gson.TypeAdapters + public static class Parameters { + public String filter; + public BufferTimeout bufferTimeout; + + public Parameters() { + } + + public Parameters(String filter, BufferTimeout bufferTimeout) { + this.filter = filter; + this.bufferTimeout = bufferTimeout; + } + + public static class BufferTimeout { + public BufferTimeout(int maxSize, int maxTimeMiliseconds) { + this.maxSize = maxSize; + this.maxTimeMiliseconds = maxTimeMiliseconds; + } + + public BufferTimeout() { + } + + public int maxSize; + public int maxTimeMiliseconds; + } + } + @Getter private final String id; @@ -36,15 +68,39 @@ public class Job { @Getter private final String owner; + @Getter + private final Parameters parameters; + @Getter private final String lastUpdated; - public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) { + private final Pattern jobDataFilter; + + public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) { this.id = id; this.callbackUrl = callbackUrl; this.type = type; this.owner = owner; this.lastUpdated = lastUpdated; + this.parameters = parameters; + if (parameters != null && parameters.filter != null) { + jobDataFilter = Pattern.compile(parameters.filter); + } else { + jobDataFilter = null; + } + } + + public boolean isFilterMatch(String data) { + if (jobDataFilter == null) { + return true; + } + Matcher matcher = jobDataFilter.matcher(data); + return matcher.find(); + } + + public boolean isBuffered() { + return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0 + && parameters.bufferTimeout.maxTimeMiliseconds > 0; } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 6e2b3265..8a388248 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -26,8 +26,10 @@ import java.util.Map; import java.util.Vector; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @@ -36,8 +38,11 @@ public class Jobs { private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); + private final KafkaTopicConsumers kafkaConsumers; - public Jobs() {} + public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) { + this.kafkaConsumers = kafkaConsumers; + } public synchronized Job getJob(String id) throws ServiceException { Job job = allJobs.get(id); @@ -52,9 +57,10 @@ public class Jobs { } public synchronized void put(Job job) { - logger.debug("Put service: {}", job.getId()); + logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); + kafkaConsumers.addJob(job); } public synchronized Iterable getAll() { @@ -72,6 +78,7 @@ public class Jobs { public synchronized void remove(Job job) { this.allJobs.remove(job.getId()); jobsByType.remove(job.getType().getId(), job.getId()); + kafkaConsumers.removeJob(job); } public synchronized int size() { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java similarity index 85% rename from dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java rename to dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index b7c4ec66..7d557585 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -39,15 +39,16 @@ import reactor.core.publisher.Mono; * consumers that has a job for this InformationType. */ -public class DmaapMessageConsumer { +public class DmaapTopicConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); - private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); - private final ApplicationConfig applicationConfig; + private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class); + private final AsyncRestClient dmaapRestClient; - private final AsyncRestClient consumerRestClient; - private final InfoType type; - private final Jobs jobs; private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); + private final AsyncRestClient consumerRestClient; + protected final ApplicationConfig applicationConfig; + protected final InfoType type; + protected final Jobs jobs; /** Submits new elements until stopped */ private static class InfiniteFlux { @@ -80,10 +81,10 @@ public class DmaapMessageConsumer { } } - public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { - this.applicationConfig = applicationConfig; + public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); + this.applicationConfig = applicationConfig; this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; @@ -93,31 +94,24 @@ public class DmaapMessageConsumer { public void start() { infiniteSubmitter.start() // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // - .flatMap(this::handleReceivedMessage, 5) // + .flatMap(this::pushDataToConsumers) // .subscribe(// - value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), // + null, // throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) // - ); + () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); // + } private String getDmaapUrl() { - return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl(); } private Mono handleDmaapErrorResponse(Throwable t) { logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl()); - return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // - .flatMap(notUsed -> Mono.empty()); - } - - private Mono handleConsumerErrorResponse(Throwable t) { - logger.warn("error from CONSUMER {}", t.getMessage()); - return Mono.empty(); + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty()); } - protected Mono getFromMessageRouter(String topicUrl) { + private Mono getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); return dmaapRestClient.get(topicUrl) // .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. @@ -125,9 +119,14 @@ public class DmaapMessageConsumer { .onErrorResume(this::handleDmaapErrorResponse); // } - protected Flux handleReceivedMessage(String body) { - logger.debug("Received from DMAAP {}", body); - final int CONCURRENCY = 5; + private Mono handleConsumerErrorResponse(Throwable t) { + logger.warn("error from CONSUMER {}", t.getMessage()); + return Mono.empty(); + } + + protected Flux pushDataToConsumers(String body) { + logger.debug("Received data {}", body); + final int CONCURRENCY = 50; // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // @@ -135,5 +134,4 @@ public class DmaapMessageConsumer { .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } - } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java new file mode 100644 index 00000000..6079edfb --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java @@ -0,0 +1,130 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; + +/** + * The class fetches incoming requests from DMAAP and sends them further to the + * consumers that has a job for this InformationType. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class KafkaTopicConsumer { + private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class); + private final AsyncRestClient consumerRestClient; + private final ApplicationConfig applicationConfig; + private final InfoType type; + private final Many consumerDistributor; + + public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) { + this.applicationConfig = applicationConfig; + + final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10; + this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); + + AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") + : restclientFactory.createRestClientNoHttpProxy(""); + this.type = type; + startKafkaTopicReceiver(); + } + + private Disposable startKafkaTopicReceiver() { + return KafkaReceiver.create(kafkaInputProperties()) // + .receive() // + .flatMap(this::onReceivedData) // + .subscribe(null, // + throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("KafkaMessageConsumer stopped")); + } + + private Flux onReceivedData(ConsumerRecord input) { + logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); + consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); + return consumerDistributor.asFlux(); + } + + public Disposable startDistributeToConsumer(Job job) { + return getMessagesFromKafka(job) // + .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) + .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) // + .onErrorResume(this::handleConsumerErrorResponse) // + .subscribe(null, // + throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId())); + } + + private Flux getMessagesFromKafka(Job job) { + if (job.isBuffered()) { + return consumerDistributor.asFlux() // + .filter(job::isFilterMatch) // + .bufferTimeout(job.getParameters().bufferTimeout.maxSize, + Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) // + .flatMap(o -> Flux.just(o.toString())); + } else { + return consumerDistributor.asFlux() // + .filter(job::isFilterMatch); + } + } + + private Mono handleConsumerErrorResponse(Throwable t) { + logger.warn("error from CONSUMER {}", t.getMessage()); + return Mono.empty(); + } + + private ReceiverOptions kafkaInputProperties() { + Map consumerProps = new HashMap<>(); + if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { + logger.error("No kafka boostrap server is setup"); + } + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return ReceiverOptions.create(consumerProps) + .subscription(Collections.singleton(this.type.getKafkaInputTopic())); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java new file mode 100644 index 00000000..23d9da2c --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -0,0 +1,79 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.util.HashMap; +import java.util.Map; + +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.Disposable; + +/** + * The class fetches incoming requests from DMAAP and sends them further to the + * consumers that has a job for this InformationType. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +@Component +public class KafkaTopicConsumers { + private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class); + + private final Map topicConsumers = new HashMap<>(); + private final Map activeSubscriptions = new HashMap<>(); + private final ApplicationConfig appConfig; + + public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) { + this.appConfig = appConfig; + } + + public void start(InfoTypes types) { + for (InfoType type : types.getAll()) { + if (type.isKafkaTopicDefined()) { + KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type); + topicConsumers.put(type.getId(), topicConsumer); + } + } + } + + public synchronized void addJob(Job job) { + if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { + logger.debug("Kafka job added {}", job.getId()); + KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId()); + Disposable subscription = topicConsumer.startDistributeToConsumer(job); + activeSubscriptions.put(job.getId(), subscription); + } + } + + public synchronized void removeJob(Job job) { + Disposable d = activeSubscriptions.remove(job.getId()); + if (d != null) { + logger.debug("Kafka job removed {}", job.getId()); + d.dispose(); + } + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 4a68ab0e..e8b236c9 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -20,14 +20,21 @@ package org.oran.dmaapadapter.tasks; +import com.google.common.io.CharStreams; import com.google.gson.JsonParser; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + import lombok.Getter; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; import org.oran.dmaapadapter.repository.InfoType; @@ -111,12 +118,12 @@ public class ProducerRegstrationTask { private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; - final String producerUrl = - applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // - .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())), + .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))), CONCURRENCY) // .collectList() // .doOnNext(type -> logger.info("Registering producer")) // @@ -127,18 +134,39 @@ public class ProducerRegstrationTask { return jsonObject("{}"); } - private ProducerInfoTypeInfo typeRegistrationInfo() { - return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject()); + private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) { + try { + return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject()); + } catch (Exception e) { + logger.error("Fatal error {}", e.getMessage()); + return null; + } + } + + private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException { + + if (type.isKafkaTopicDefined()) { + String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json"); + return jsonObject(schemaStrKafka); + } else { + // An object with no properties + String schemaStr = "{" // + + "\"type\": \"object\"," // + + "\"properties\": {}," // + + "\"additionalProperties\": false" // + + "}"; // + + return jsonObject(schemaStr); + } } - private Object jsonSchemaObject() { - // An object with no properties - String schemaStr = "{" // - + "\"type\": \"object\"," // - + "\"properties\": {}," // - + "\"additionalProperties\": false" // - + "}"; // - return jsonObject(schemaStr); + private String readSchemaFile(String filePath) throws IOException, ServiceException { + InputStream in = getClass().getResourceAsStream(filePath); + logger.debug("Reading application schema file from: {} with: {}", filePath, in); + if (in == null) { + throw new ServiceException("Could not readfile: " + filePath); + } + return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8)); } private Object jsonObject(String json) { diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json new file mode 100644 index 00000000..0ff7c80e --- /dev/null +++ b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json @@ -0,0 +1,26 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "filter": { + "type": "string" + }, + "bufferTimeout": { + "type": "object", + "properties": { + "maxSize": { + "type": "integer" + }, + "maxTimeMiliseconds": { + "type": "integer" + } + }, + "required": [ + "maxSize", + "maxTimeMiliseconds" + ] + } + }, + "required": [ + ] +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java index 828b027d..8d1dda66 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java @@ -79,7 +79,6 @@ public class EcsSimulatorController { } else { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } - } @PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java index 1cceef08..376d23e5 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java @@ -52,6 +52,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; +@SuppressWarnings("java:S3577") // Rename class @ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) @TestPropertySource(properties = { // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java new file mode 100644 index 00000000..31ef970f --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -0,0 +1,257 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.gson.JsonParser; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; +import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import reactor.core.publisher.Flux; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; + +@SuppressWarnings("java:S3577") // Rename class +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) +@TestPropertySource(properties = { // + "server.ssl.key-store=./config/keystore.jks", // + "app.webclient.trust-store=./config/truststore.jks", // + "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"// +}) +class IntegrationWithKafka { + + @Autowired + private ApplicationConfig applicationConfig; + + @Autowired + private Jobs jobs; + + @Autowired + private InfoTypes types; + + @Autowired + private ConsumerController consumerController; + + @Autowired + private EcsSimulatorController ecsSimulatorController; + + private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + + private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); + + @LocalServerPort + int localServerHttpPort; + + static class TestApplicationConfig extends ApplicationConfig { + @Override + public String getEcsBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getDmaapBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getSelfUrl() { + return thisProcessUrl(); + } + + private String thisProcessUrl() { + final String url = "https://localhost:" + getLocalServerHttpPort(); + return url; + } + } + + /** + * Overrides the BeanFactory. + */ + @TestConfiguration + static class TestBeanFactory extends BeanFactory { + + @Override + @Bean + public ServletWebServerFactory servletContainer() { + return new TomcatServletWebServerFactory(); + } + + @Override + @Bean + public ApplicationConfig getApplicationConfig() { + TestApplicationConfig cfg = new TestApplicationConfig(); + return cfg; + } + } + + @AfterEach + void reset() { + this.consumerController.testResults.reset(); + this.ecsSimulatorController.testResults.reset(); + this.jobs.clear(); + } + + private AsyncRestClient restClient(boolean useTrustValidation) { + WebClientConfig config = this.applicationConfig.getWebClientConfig(); + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost("") // + .httpProxyPort(0) // + .build(); + config = ImmutableWebClientConfig.builder() // + .keyStoreType(config.keyStoreType()) // + .keyStorePassword(config.keyStorePassword()) // + .keyStore(config.keyStore()) // + .keyPassword(config.keyPassword()) // + .isTrustStoreUsed(useTrustValidation) // + .trustStore(config.trustStore()) // + .trustStorePassword(config.trustStorePassword()) // + .httpProxyConfig(httpProxyConfig).build(); + + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + return restClientFactory.createRestClientNoHttpProxy(baseUrl()); + } + + private AsyncRestClient restClient() { + return restClient(false); + } + + private String baseUrl() { + return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); + } + + private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) { + Job.Parameters param = new Job.Parameters(filter, + new Job.Parameters.BufferTimeout(maxSize, maxTimeMiliseconds)); + String str = gson.toJson(param); + return jsonObject(str); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + throw new NullPointerException(e.toString()); + } + } + + private ConsumerJobInfo consumerJobInfo(String filter, int maxTimeMiliseconds, int maxSize) { + try { + InfoType type = this.types.getAll().iterator().next(); + String typeId = type.getId(); + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + return new ConsumerJobInfo(typeId, jobParametersAsJsonObject(filter, maxTimeMiliseconds, maxSize), "owner", + targetUri, ""); + } catch (Exception e) { + return null; + } + } + + private SenderOptions senderOptions() { + String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers(); + + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer"); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return SenderOptions.create(props); + } + + private SenderRecord senderRecord(String data, int i) { + final InfoType infoType = this.types.getAll().iterator().next(); + return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i); + } + + @Test + void kafkaIntegrationTest() throws InterruptedException { + final String JOB_ID1 = "ID1"; + final String JOB_ID2 = "ID2"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + + // Create a job + this.ecsSimulatorController.addJob(consumerJobInfo(".*", 10, 1000), JOB_ID1, restClient()); + this.ecsSimulatorController.addJob(consumerJobInfo(".*Message_1.*", 0, 0), JOB_ID2, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + + final KafkaSender sender = KafkaSender.create(senderOptions()); + + var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. + + sender.send(dataToSend) // + .doOnError(e -> logger.error("Send failed", e)) // + .doOnNext(senderResult -> logger.debug("Sent {}", senderResult)) // + .doOnError(t -> logger.error("Error {}", t)) // + .blockLast(); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("Message_1"); + assertThat(consumer.receivedBodies.get(1)).isEqualTo("[Message_1, Message_2, Message_3]"); + + // Delete the job + this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + +} diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json index 8d211b89..794eb8ec 100644 --- a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json +++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json @@ -3,7 +3,7 @@ { "id": "ExampleInformationType", "dmaapTopicUrl": "/dmaap-topic-1", - "useHttpProxy": true + "useHttpProxy": false } ] } \ No newline at end of file diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json new file mode 100644 index 00000000..e2ea5256 --- /dev/null +++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json @@ -0,0 +1,9 @@ +{ + "types": [ + { + "id": "ExampleInformationType", + "kafkaInputTopic": "TutorialTopic", + "useHttpProxy": false + } + ] +} \ No newline at end of file -- 2.16.6