From: PatrikBuhr Date: Mon, 20 Jun 2022 11:16:38 +0000 (+0200) Subject: dmaapadapter, deliver data over Kafka X-Git-Tag: 1.1.0^0 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=4bfffa671f94f2f3591f78918de408ec363d6616;p=nonrtric%2Fplt%2Fdmaapadapter.git dmaapadapter, deliver data over Kafka Code and documentation updated. Change-Id: I04d4ef56d8a4953e32e0a1ac71b19449bc4aeb68 Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-768 --- diff --git a/docs/Architecture.png b/docs/Architecture.png index 8f4c3ec..4b952b3 100644 Binary files a/docs/Architecture.png and b/docs/Architecture.png differ diff --git a/docs/Architecture.pptx b/docs/Architecture.pptx deleted file mode 100644 index 8c8d76f..0000000 Binary files a/docs/Architecture.pptx and /dev/null differ diff --git a/docs/DataDelivery.png b/docs/DataDelivery.png new file mode 100644 index 0000000..9207e81 Binary files /dev/null and b/docs/DataDelivery.png differ diff --git a/docs/Pictures.pptx b/docs/Pictures.pptx new file mode 100644 index 0000000..e2cf47e Binary files /dev/null and b/docs/Pictures.pptx differ diff --git a/docs/overview.rst b/docs/overview.rst index db78630..d7084a9 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -10,8 +10,8 @@ DMaaP Adapter Introduction ************ -This is a generic information producer using the Information Coordination Service (ICS) Data Producer API. It can get information from DMaaP (ONAP) or directly from Kafka topics and deliver the -information to data consumers using REST calls (POST). +This is a generic information producer using the Information Coordination Service (ICS) Data Producer API. It can get information from DMaaP (ONAP) or directly from Kafka topics. +The information can be filtered, transformed, aggregated and then delivered to data consumers using REST calls (POST) or via Kafka. The DMaaP Adapter registers itself as an information producer along with its information types in Information Coordination Service (ICS). The information types are defined in a configuration file. @@ -26,6 +26,28 @@ The service is implemented in Java Spring Boot (DMaaP Adapter Service). .. image:: ./Architecture.png :width: 500pt +************* +Data Delivery +************* +When a data consumer creates a an Information Job, either a URL for REST callbacks, or a Kafka Topic can be given as output for the job. +After filtering, aggregation and data transformation the data will be delivered to the output. Several data consumers can receive data from one Kafka Topic. + +.. image:: ./DataDelivery.png + :width: 500pt + +The output will be the same regardless if the information is received from DMaaP of from Kafka. If the data is not buffered/aggregated, +and the output is a Kafka Stream, both the keys and the values are forwarded (after filtering/transformation). +If the output is HTTP,only the the values are forwarded (in the HTTP body). + +**************** +Data Aggregation +**************** +When an Information Job is created, a bufferTimeout can be defined for aggregation of information. +If this feature is used, the subscribed data will be buffered and will be delivered in chunks. + +The data will then be wrapped in a JSON array in a manner similar to DMaaP. The type configuration can define if the received data is Json. +If not, each object is quoted (the output will then be an array of strings). If the data type is Json, the output will be an array of Json objects. + ****************** Configuration File ****************** @@ -92,6 +114,8 @@ typeSchema.json =============== This schema will by default be registerred for the type. The following properties are defined: +* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. + * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt". * regexp is for standard regexp matching of text. Objects that contains a match of the expression will be pushed to the consumer. @@ -184,4 +208,4 @@ Below follows an example on a PM filter. "ManagedElement=RNC-Gbg-1" ] } - } \ No newline at end of file + } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 776a7ce..90827da 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -58,13 +58,18 @@ public class Job { private Integer maxConcurrency; + @Getter + private String kafkaOutputTopic; + public Parameters() {} - public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency) { + public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency, + String kafkaOutputTopic) { this.filter = filter; this.bufferTimeout = bufferTimeout; this.maxConcurrency = maxConcurrency; this.filterType = filterType; + this.kafkaOutputTopic = kafkaOutputTopic; } public int getMaxConcurrency() { @@ -153,7 +158,6 @@ public class Job { this.parameters = parameters; filter = createFilter(parameters); this.consumerRestClient = consumerRestClient; - } private static Filter createFilter(Parameters parameters) { diff --git a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 3479720..825673a 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -20,6 +20,8 @@ package org.oran.dmaapadapter.repository; +import com.google.common.base.Strings; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -71,7 +73,11 @@ public class Jobs { } public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, - Parameters parameters) { + Parameters parameters) throws ServiceException { + + if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) { + throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST); + } AsyncRestClient consumerRestClient = type.isUseHttpProxy() // ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) // : restclientFactory.createRestClientNoHttpProxy(callbackUrl); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java similarity index 78% rename from src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java rename to src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java index 3a13c76..a412370 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java @@ -25,7 +25,6 @@ import lombok.Getter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; @@ -37,8 +36,8 @@ import reactor.core.publisher.Mono; * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -public class JobDataConsumer { - private static final Logger logger = LoggerFactory.getLogger(JobDataConsumer.class); +public abstract class DataConsumer { + private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class); @Getter private final Job job; private Disposable subscription; @@ -70,32 +69,27 @@ public class JobDataConsumer { } } - public JobDataConsumer(Job job) { + protected DataConsumer(Job job) { this.job = job; } - public synchronized void start(Flux input) { + public synchronized void start(Flux input) { stop(); this.errorStats.resetIrrecoverableErrors(); this.subscription = handleReceivedMessage(input, job) // - .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // + .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // this::handleExceptionInStream, // - () -> logger.warn("JobDataConsumer stopped jobId: {}", job.getId())); + () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId())); } private void handleExceptionInStream(Throwable t) { - logger.warn("JobDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); + logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); stop(); } - private Mono postToClient(String body) { - logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); - MediaType contentType = - this.job.isBuffered() || this.job.getType().isJson() ? MediaType.APPLICATION_JSON : null; - return job.getConsumerRestClient().post("", body, contentType); - } + protected abstract Mono sendToClient(TopicListener.Output output); public synchronized void stop() { if (this.subscription != null) { @@ -108,16 +102,17 @@ public class JobDataConsumer { return this.subscription != null; } - private Flux handleReceivedMessage(Flux input, Job job) { - Flux result = input.map(job::filter) // - .filter(t -> !t.isEmpty()); // + private Flux handleReceivedMessage(Flux inputFlux, Job job) { + Flux result = + inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) // + .filter(t -> !t.value.isEmpty()); // if (job.isBuffered()) { - result = result.map(str -> quoteNonJson(str, job)) // + result = result.map(input -> quoteNonJson(input.value, job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(Object::toString); + .map(buffered -> new TopicListener.Output("", buffered.toString())); } return result; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index dbb6059..3aa97fe 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -48,7 +48,7 @@ public class DmaapTopicListener implements TopicListener { private final ApplicationConfig applicationConfig; private final InfoType type; private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); - private Many output; + private Many output; private Disposable topicReceiverTask; public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) { @@ -60,7 +60,7 @@ public class DmaapTopicListener implements TopicListener { } @Override - public Many getOutput() { + public Many getOutput() { return this.output; } @@ -95,7 +95,7 @@ public class DmaapTopicListener implements TopicListener { private void onReceivedData(String input) { logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input); - output.emitNext(input, Sinks.EmitFailureHandler.FAIL_FAST); + output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST); } private String getDmaapUrl() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java new file mode 100644 index 0000000..87a6b67 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java @@ -0,0 +1,49 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import org.oran.dmaapadapter.repository.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import reactor.core.publisher.Mono; + +/** + * The class streams data from a multi cast sink and sends the data to the Job + * owner via REST calls. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class HttpDataConsumer extends DataConsumer { + private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class); + + public HttpDataConsumer(Job job) { + super(job); + } + + @Override + protected Mono sendToClient(TopicListener.Output output) { + Job job = this.getJob(); + logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); + MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; + return job.getConsumerRestClient().post("", output.value, contentType); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java new file mode 100644 index 0000000..2b0b7a4 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java @@ -0,0 +1,103 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; + +/** + * The class streams data from a multi cast sink and sends the data to the Job + * owner via REST calls. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class KafkaDataConsumer extends DataConsumer { + private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class); + + private KafkaSender sender; + private final ApplicationConfig appConfig; + + public KafkaDataConsumer(Job job, ApplicationConfig appConfig) { + super(job); + this.appConfig = appConfig; + } + + @Override + protected Mono sendToClient(TopicListener.Output data) { + Job job = this.getJob(); + + logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic()); + + SenderRecord senderRecord = senderRecord(data, job); + + return this.sender.send(Mono.just(senderRecord)) // + .collectList() // + .map(x -> data.value); + } + + @Override + public synchronized void start(Flux input) { + super.start(input); + SenderOptions senderOptions = senderOptions(appConfig); + this.sender = KafkaSender.create(senderOptions); + } + + @Override + public synchronized void stop() { + super.stop(); + if (sender != null) { + sender.close(); + sender = null; + } + } + + private static SenderOptions senderOptions(ApplicationConfig config) { + String bootstrapServers = config.getKafkaBootStrapServers(); + + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return SenderOptions.create(props); + } + + private SenderRecord senderRecord(TopicListener.Output output, Job infoJob) { + int correlationMetadata = 2; + String topic = infoJob.getParameters().getKafkaOutputTopic(); + return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 4b58013..4a7f269 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -47,7 +47,7 @@ public class KafkaTopicListener implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; - private Many output; + private Many output; private Disposable topicReceiverTask; public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { @@ -56,7 +56,7 @@ public class KafkaTopicListener implements TopicListener { } @Override - public Many getOutput() { + public Many getOutput() { return this.output; } @@ -84,7 +84,7 @@ public class KafkaTopicListener implements TopicListener { private void onReceivedData(ConsumerRecord input) { logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); - output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); + output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST); } private void onReceivedError(Throwable t) { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 503f113..e32cfa5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,12 +20,25 @@ package org.oran.dmaapadapter.tasks; +import lombok.ToString; import reactor.core.publisher.Sinks.Many; public interface TopicListener { + + @ToString + public static class Output { + public final String key; + public final String value; + + public Output(String key, String value) { + this.key = key; + this.value = value; + } + } + public void start(); public void stop(); - public Many getOutput(); + public Many getOutput(); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 685379c..df70b9f 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -25,6 +25,7 @@ import java.util.Map; import lombok.Getter; +import org.apache.logging.log4j.util.Strings; import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; @@ -49,13 +50,15 @@ public class TopicListeners { private final Map dmaapTopicListeners = new HashMap<>(); // Key is typeId @Getter - private final MultiMap kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId - private final MultiMap dmaapConsumers = new MultiMap<>(); // Key is typeId, jobId + private final MultiMap dataConsumers = new MultiMap<>(); // Key is typeId, jobId + + private final ApplicationConfig appConfig; private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs, @Autowired SecurityContext securityContext) { + this.appConfig = appConfig; for (InfoType type : types.getAll()) { if (type.isKafkaTopicDefined()) { @@ -85,32 +88,35 @@ public class TopicListeners { removeJob(job); logger.debug("Job added {}", job.getId()); if (job.getType().isKafkaTopicDefined()) { - addJob(job, kafkaConsumers, kafkaTopicListeners); + addConsumer(job, dataConsumers, kafkaTopicListeners); } if (job.getType().isDmaapTopicDefined()) { - addJob(job, dmaapConsumers, dmaapTopicListeners); + addConsumer(job, dataConsumers, dmaapTopicListeners); } } - private static void addJob(Job job, MultiMap consumers, - Map topicListeners) { + private DataConsumer createConsumer(Job job) { + return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig) + : new HttpDataConsumer(job); + } + + private void addConsumer(Job job, MultiMap consumers, Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); if (consumers.get(job.getType().getId()).isEmpty()) { topicListener.start(); } - JobDataConsumer subscription = new JobDataConsumer(job); - subscription.start(topicListener.getOutput().asFlux()); - consumers.put(job.getType().getId(), job.getId(), subscription); + DataConsumer consumer = createConsumer(job); + consumer.start(topicListener.getOutput().asFlux()); + consumers.put(job.getType().getId(), job.getId(), consumer); } public synchronized void removeJob(Job job) { - removeJob(job, kafkaConsumers); - removeJob(job, dmaapConsumers); + removeJob(job, dataConsumers); } - private static void removeJob(Job job, MultiMap consumers) { - JobDataConsumer consumer = consumers.remove(job.getType().getId(), job.getId()); + private static void removeJob(Job job, MultiMap consumers) { + DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId()); if (consumer != null) { logger.debug("Job removed {}", job.getId()); consumer.stop(); @@ -119,25 +125,23 @@ public class TopicListeners { @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) public synchronized void restartNonRunningKafkaTopics() { - for (String typeId : this.kafkaConsumers.keySet()) { - for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) { - if (!consumer.isRunning()) { - restartTopicAndConsumers(this.kafkaTopicListeners, this.kafkaConsumers, consumer); - } + for (DataConsumer consumer : this.dataConsumers.values()) { + if (!consumer.isRunning()) { + restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer); } } + } private static void restartTopicAndConsumers(Map topicListeners, - MultiMap consumers, JobDataConsumer consumer) { + MultiMap consumers, DataConsumer consumer) { InfoType type = consumer.getJob().getType(); TopicListener topic = topicListeners.get(type.getId()); topic.start(); restartConsumersOfType(consumers, topic, type); } - private static void restartConsumersOfType(MultiMap consumers, TopicListener topic, - InfoType type) { + private static void restartConsumersOfType(MultiMap consumers, TopicListener topic, InfoType type) { consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } } diff --git a/src/main/resources/typeSchema.json b/src/main/resources/typeSchema.json index 99b8647..be2829f 100644 --- a/src/main/resources/typeSchema.json +++ b/src/main/resources/typeSchema.json @@ -17,6 +17,9 @@ "type": "integer", "minimum": 1 }, + "kafkaOutputTopic" : { + "type": "string" + }, "bufferTimeout": { "type": "object", "properties": { @@ -38,4 +41,4 @@ } }, "additionalProperties": false -} \ No newline at end of file +} diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index 84d21f8..10c7662 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -61,6 +61,9 @@ "type": "integer", "minimum": 1 }, + "kafkaOutputTopic" : { + "type": "string" + }, "bufferTimeout": { "type": "object", "additionalProperties": false, @@ -81,4 +84,4 @@ ] } } -} \ No newline at end of file +} diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 6c6ceda..c4b5ece 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -40,7 +40,6 @@ import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; @@ -55,8 +54,9 @@ import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.repository.filters.PmReport; import org.oran.dmaapadapter.repository.filters.PmReportFilter; -import org.oran.dmaapadapter.tasks.JobDataConsumer; +import org.oran.dmaapadapter.tasks.DataConsumer; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; +import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -70,14 +70,12 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -@ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // @@ -283,18 +281,18 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1); + Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null); String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, ""); this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - JobDataConsumer kafkaConsumer = this.topicListeners.getKafkaConsumers().get(TYPE_ID, JOB_ID); + DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID); // Handle received data from Kafka, check that it has been posted to the // consumer - kafkaConsumer.start(Flux.just("data")); + kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data"))); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); @@ -318,7 +316,7 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1); + Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null); ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -348,7 +346,7 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, null, 1); + Job.Parameters param = new Job.Parameters(null, null, null, 1, null); ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -386,7 +384,7 @@ class ApplicationTest { filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, - new Job.BufferTimeout(123, 456), null); + new Job.BufferTimeout(123, 456), null, null); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson)); @@ -420,7 +418,7 @@ class ApplicationTest { // Create a job with a PM filter String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" // + "."; - Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null); + Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson)); @@ -517,7 +515,7 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getMeasTypes().add("succImmediateAssignProcs"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null); + Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson)); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 603cea7..287c20b 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -234,7 +234,7 @@ class IntegrationWithIcs { final String TYPE_ID = "KafkaInformationType"; Job.Parameters param = - new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1); + new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); @@ -253,8 +253,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = - new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 170 * 1000), 1); + Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, + new Job.BufferTimeout(123, 170 * 1000), 1, null); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 7bbf26c..330eb6b 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -27,28 +27,30 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.gson.JsonParser; import java.time.Duration; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.JobDataConsumer; +import org.oran.dmaapadapter.tasks.DataConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicListener; +import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,15 +63,14 @@ import org.springframework.boot.web.server.LocalServerPort; import org.springframework.boot.web.servlet.server.ServletWebServerFactory; import org.springframework.context.annotation.Bean; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringExtension; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; @SuppressWarnings("java:S3577") // Rename class -@ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // @@ -103,7 +104,7 @@ class IntegrationWithKafka { private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); - private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); + private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); @LocalServerPort int localServerHttpPort; @@ -187,8 +188,8 @@ class IntegrationWithKafka { private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { - Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, - new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency); + Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null; + Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null); String str = gson.toJson(param); return jsonObject(str); } @@ -212,27 +213,42 @@ class IntegrationWithKafka { } } - private SenderOptions senderOptions() { + ConsumerJobInfo consumerJobInfoKafka(String topic) { + try { + Job.Parameters param = new Job.Parameters(null, null, null, 1, topic); + String str = gson.toJson(param); + Object parametersObj = jsonObject(str); + + return new ConsumerJobInfo(TYPE_ID, parametersObj, "owner", null, ""); + } catch (Exception e) { + return null; + } + } + + private SenderOptions senderOptions() { String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers(); Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return SenderOptions.create(props); } - private SenderRecord senderRecord(String data) { + private SenderRecord senderRecord(String data) { + return senderRecord(data, ""); + } + + private SenderRecord senderRecord(String data, String key) { final InfoType infoType = this.types.get(TYPE_ID); - int key = 1; int correlationMetadata = 2; return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata); } - private void sendDataToStream(Flux> dataToSend) { - final KafkaSender sender = KafkaSender.create(senderOptions()); + private void sendDataToStream(Flux> dataToSend) { + final KafkaSender sender = KafkaSender.create(senderOptions()); sender.send(dataToSend) // .doOnError(e -> logger.error("Send failed", e)) // @@ -266,7 +282,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); sleep(4000); - var dataToSend = Flux.just(senderRecord("Message")); + var dataToSend = Flux.just(senderRecord("Message", "")); sendDataToStream(dataToSend); verifiedReceivedByConsumer("Message"); @@ -274,7 +290,49 @@ class IntegrationWithKafka { this.icsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty()); + await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); + } + + TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", ""); + + @Test + void sendToKafkaConsumer() throws ServiceException, InterruptedException { + final String JOB_ID = "ID"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + final String OUTPUT_TOPIC = "outputTopic"; + + this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Create a listener to the output topic. The KafkaTopicListener happens to be + // suitable for that, + InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false); + KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type); + receiver.start(); + + Disposable disponsable = receiver.getOutput().asFlux() // + .doOnNext(output -> { + receivedKafkaOutput = output; + logger.info("*** recived {}, {}", OUTPUT_TOPIC, output); + }) // + .doFinally(sig -> logger.info("Finally " + sig)) // + .subscribe(); + + String sendString = "testData " + Instant.now(); + String sendKey = "key " + Instant.now(); + var dataToSend = Flux.just(senderRecord(sendString, sendKey)); + sleep(4000); + sendDataToStream(dataToSend); + + await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString)); + assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey); + + disponsable.dispose(); + receiver.stop(); } @Test @@ -304,7 +362,7 @@ class IntegrationWithKafka { this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty()); + await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); } @Test @@ -326,7 +384,7 @@ class IntegrationWithKafka { var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. sendDataToStream(dataToSend); // this should overflow - JobDataConsumer consumer = topicListeners.getKafkaConsumers().get(TYPE_ID).iterator().next(); + DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next(); await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); @@ -344,7 +402,7 @@ class IntegrationWithKafka { this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty()); + await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); } }