dmaapadapter, deliver data over Kafka 00/8600/4 1.1.0
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 20 Jun 2022 11:16:38 +0000 (13:16 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 21 Jun 2022 12:11:31 +0000 (14:11 +0200)
Code and documentation updated.

Change-Id: I04d4ef56d8a4953e32e0a1ac71b19449bc4aeb68
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-768

19 files changed:
docs/Architecture.png
docs/Architecture.pptx [deleted file]
docs/DataDelivery.png [new file with mode: 0644]
docs/Pictures.pptx [new file with mode: 0644]
docs/overview.rst
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java [moved from src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java with 78% similarity]
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchema.json
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 8f4c3ec..4b952b3 100644 (file)
Binary files a/docs/Architecture.png and b/docs/Architecture.png differ
diff --git a/docs/Architecture.pptx b/docs/Architecture.pptx
deleted file mode 100644 (file)
index 8c8d76f..0000000
Binary files a/docs/Architecture.pptx and /dev/null differ
diff --git a/docs/DataDelivery.png b/docs/DataDelivery.png
new file mode 100644 (file)
index 0000000..9207e81
Binary files /dev/null and b/docs/DataDelivery.png differ
diff --git a/docs/Pictures.pptx b/docs/Pictures.pptx
new file mode 100644 (file)
index 0000000..e2cf47e
Binary files /dev/null and b/docs/Pictures.pptx differ
index db78630..d7084a9 100644 (file)
@@ -10,8 +10,8 @@ DMaaP Adapter
 Introduction
 ************
 
-This is a generic information producer using the Information Coordination Service (ICS) Data Producer API. It can get information from DMaaP (ONAP) or directly from Kafka topics and deliver the
-information to data consumers using REST calls (POST).
+This is a generic information producer using the Information Coordination Service (ICS) Data Producer API. It can get information from DMaaP (ONAP) or directly from Kafka topics.
+The information can be filtered, transformed, aggregated and then delivered to data consumers using REST calls (POST) or via Kafka.
 
 The DMaaP Adapter registers itself as an information producer along with its information types in Information Coordination Service (ICS).
 The information types are defined in a configuration file.
@@ -26,6 +26,28 @@ The service is implemented in Java Spring Boot (DMaaP Adapter Service).
 .. image:: ./Architecture.png
    :width: 500pt
 
+*************
+Data Delivery
+*************
+When a data consumer creates a an Information Job, either a URL for REST callbacks, or a Kafka Topic can be given as output for the job.
+After filtering, aggregation and data transformation the data will be delivered to the output. Several data consumers can receive data from one Kafka Topic.
+
+.. image:: ./DataDelivery.png
+   :width: 500pt
+
+The output will be the same regardless if the information is received from DMaaP of from Kafka. If the data is not buffered/aggregated,
+and the output is a Kafka Stream, both the keys and the values are forwarded (after filtering/transformation).
+If the output is HTTP,only the the values are forwarded (in the HTTP body).
+
+****************
+Data Aggregation
+****************
+When an Information Job is created, a bufferTimeout can be defined for aggregation of information.
+If this feature is used, the subscribed data will be buffered and will be delivered in chunks.
+
+The data will then be wrapped in a JSON array in a manner similar to DMaaP. The type configuration can define if the received data is Json.
+If not, each object is quoted (the output will then be an array of strings). If the data type is Json, the output will be an array of Json objects.
+
 ******************
 Configuration File
 ******************
@@ -92,6 +114,8 @@ typeSchema.json
 ===============
 This schema will by default be registerred for the type. The following properties are defined:
 
+* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka.
+
 * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
 
   * regexp is for standard regexp matching of text. Objects that contains a match of the expression will be pushed to the consumer.
@@ -184,4 +208,4 @@ Below follows an example on a PM filter.
            "ManagedElement=RNC-Gbg-1"
         ]
       }
-    }
\ No newline at end of file
+    }
index 776a7ce..90827da 100644 (file)
@@ -58,13 +58,18 @@ public class Job {
 
         private Integer maxConcurrency;
 
+        @Getter
+        private String kafkaOutputTopic;
+
         public Parameters() {}
 
-        public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency) {
+        public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency,
+                String kafkaOutputTopic) {
             this.filter = filter;
             this.bufferTimeout = bufferTimeout;
             this.maxConcurrency = maxConcurrency;
             this.filterType = filterType;
+            this.kafkaOutputTopic = kafkaOutputTopic;
         }
 
         public int getMaxConcurrency() {
@@ -153,7 +158,6 @@ public class Job {
         this.parameters = parameters;
         filter = createFilter(parameters);
         this.consumerRestClient = consumerRestClient;
-
     }
 
     private static Filter createFilter(Parameters parameters) {
index 3479720..825673a 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.oran.dmaapadapter.repository;
 
+import com.google.common.base.Strings;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -71,7 +73,11 @@ public class Jobs {
     }
 
     public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
-            Parameters parameters) {
+            Parameters parameters) throws ServiceException {
+
+        if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) {
+            throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST);
+        }
         AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
                 ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
                 : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
@@ -25,7 +25,6 @@ import lombok.Getter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
@@ -37,8 +36,8 @@ import reactor.core.publisher.Mono;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class JobDataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(JobDataConsumer.class);
+public abstract class DataConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class);
     @Getter
     private final Job job;
     private Disposable subscription;
@@ -70,32 +69,27 @@ public class JobDataConsumer {
         }
     }
 
-    public JobDataConsumer(Job job) {
+    protected DataConsumer(Job job) {
         this.job = job;
     }
 
-    public synchronized void start(Flux<String> input) {
+    public synchronized void start(Flux<TopicListener.Output> input) {
         stop();
         this.errorStats.resetIrrecoverableErrors();
         this.subscription = handleReceivedMessage(input, job) //
-                .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
+                .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
                         this::handleExceptionInStream, //
-                        () -> logger.warn("JobDataConsumer stopped jobId: {}", job.getId()));
+                        () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
     }
 
     private void handleExceptionInStream(Throwable t) {
-        logger.warn("JobDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+        logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
         stop();
     }
 
-    private Mono<String> postToClient(String body) {
-        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
-        MediaType contentType =
-                this.job.isBuffered() || this.job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
-        return job.getConsumerRestClient().post("", body, contentType);
-    }
+    protected abstract Mono<String> sendToClient(TopicListener.Output output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
@@ -108,16 +102,17 @@ public class JobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> handleReceivedMessage(Flux<String> input, Job job) {
-        Flux<String> result = input.map(job::filter) //
-                .filter(t -> !t.isEmpty()); //
+    private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
+        Flux<TopicListener.Output> result =
+                inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
+                        .filter(t -> !t.value.isEmpty()); //
 
         if (job.isBuffered()) {
-            result = result.map(str -> quoteNonJson(str, job)) //
+            result = result.map(input -> quoteNonJson(input.value, job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(Object::toString);
+                    .map(buffered -> new TopicListener.Output("", buffered.toString()));
         }
         return result;
     }
index dbb6059..3aa97fe 100644 (file)
@@ -48,7 +48,7 @@ public class DmaapTopicListener implements TopicListener {
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
-    private Many<String> output;
+    private Many<Output> output;
     private Disposable topicReceiverTask;
 
     public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
@@ -60,7 +60,7 @@ public class DmaapTopicListener implements TopicListener {
     }
 
     @Override
-    public Many<String> getOutput() {
+    public Many<Output> getOutput() {
         return this.output;
     }
 
@@ -95,7 +95,7 @@ public class DmaapTopicListener implements TopicListener {
 
     private void onReceivedData(String input) {
         logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
-        output.emitNext(input, Sinks.EmitFailureHandler.FAIL_FAST);
+        output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST);
     }
 
     private String getDmaapUrl() {
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
new file mode 100644 (file)
index 0000000..87a6b67
--- /dev/null
@@ -0,0 +1,49 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import reactor.core.publisher.Mono;
+
+/**
+ * The class streams data from a multi cast sink and sends the data to the Job
+ * owner via REST calls.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class HttpDataConsumer extends DataConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class);
+
+    public HttpDataConsumer(Job job) {
+        super(job);
+    }
+
+    @Override
+    protected Mono<String> sendToClient(TopicListener.Output output) {
+        Job job = this.getJob();
+        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
+        MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
+        return job.getConsumerRestClient().post("", output.value, contentType);
+    }
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
new file mode 100644 (file)
index 0000000..2b0b7a4
--- /dev/null
@@ -0,0 +1,103 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+/**
+ * The class streams data from a multi cast sink and sends the data to the Job
+ * owner via REST calls.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaDataConsumer extends DataConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class);
+
+    private KafkaSender<String, String> sender;
+    private final ApplicationConfig appConfig;
+
+    public KafkaDataConsumer(Job job, ApplicationConfig appConfig) {
+        super(job);
+        this.appConfig = appConfig;
+    }
+
+    @Override
+    protected Mono<String> sendToClient(TopicListener.Output data) {
+        Job job = this.getJob();
+
+        logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
+
+        SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
+
+        return this.sender.send(Mono.just(senderRecord)) //
+                .collectList() //
+                .map(x -> data.value);
+    }
+
+    @Override
+    public synchronized void start(Flux<TopicListener.Output> input) {
+        super.start(input);
+        SenderOptions<String, String> senderOptions = senderOptions(appConfig);
+        this.sender = KafkaSender.create(senderOptions);
+    }
+
+    @Override
+    public synchronized void stop() {
+        super.stop();
+        if (sender != null) {
+            sender.close();
+            sender = null;
+        }
+    }
+
+    private static SenderOptions<String, String> senderOptions(ApplicationConfig config) {
+        String bootstrapServers = config.getKafkaBootStrapServers();
+
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return SenderOptions.create(props);
+    }
+
+    private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+        int correlationMetadata = 2;
+        String topic = infoJob.getParameters().getKafkaOutputTopic();
+        return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
+    }
+
+}
index 4b58013..4a7f269 100644 (file)
@@ -47,7 +47,7 @@ public class KafkaTopicListener implements TopicListener {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private Many<String> output;
+    private Many<Output> output;
     private Disposable topicReceiverTask;
 
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
@@ -56,7 +56,7 @@ public class KafkaTopicListener implements TopicListener {
     }
 
     @Override
-    public Many<String> getOutput() {
+    public Many<Output> getOutput() {
         return this.output;
     }
 
@@ -84,7 +84,7 @@ public class KafkaTopicListener implements TopicListener {
 
     private void onReceivedData(ConsumerRecord<String, String> input) {
         logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
-        output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+        output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST);
     }
 
     private void onReceivedError(Throwable t) {
index 503f113..e32cfa5 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import lombok.ToString;
 import reactor.core.publisher.Sinks.Many;
 
 public interface TopicListener {
+
+    @ToString
+    public static class Output {
+        public final String key;
+        public final String value;
+
+        public Output(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+    }
+
     public void start();
 
     public void stop();
 
-    public Many<String> getOutput();
+    public Many<Output> getOutput();
 }
index 685379c..df70b9f 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import lombok.Getter;
 
+import org.apache.logging.log4j.util.Strings;
 import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -49,13 +50,15 @@ public class TopicListeners {
     private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
-    private final MultiMap<JobDataConsumer> kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId
-    private final MultiMap<JobDataConsumer> dmaapConsumers = new MultiMap<>(); // Key is typeId, jobId
+    private final MultiMap<DataConsumer> dataConsumers = new MultiMap<>(); // Key is typeId, jobId
+
+    private final ApplicationConfig appConfig;
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
     public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
             @Autowired SecurityContext securityContext) {
+        this.appConfig = appConfig;
 
         for (InfoType type : types.getAll()) {
             if (type.isKafkaTopicDefined()) {
@@ -85,32 +88,35 @@ public class TopicListeners {
         removeJob(job);
         logger.debug("Job added {}", job.getId());
         if (job.getType().isKafkaTopicDefined()) {
-            addJob(job, kafkaConsumers, kafkaTopicListeners);
+            addConsumer(job, dataConsumers, kafkaTopicListeners);
         }
 
         if (job.getType().isDmaapTopicDefined()) {
-            addJob(job, dmaapConsumers, dmaapTopicListeners);
+            addConsumer(job, dataConsumers, dmaapTopicListeners);
         }
     }
 
-    private static void addJob(Job job, MultiMap<JobDataConsumer> consumers,
-            Map<String, TopicListener> topicListeners) {
+    private DataConsumer createConsumer(Job job) {
+        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig)
+                : new HttpDataConsumer(job);
+    }
+
+    private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
         if (consumers.get(job.getType().getId()).isEmpty()) {
             topicListener.start();
         }
-        JobDataConsumer subscription = new JobDataConsumer(job);
-        subscription.start(topicListener.getOutput().asFlux());
-        consumers.put(job.getType().getId(), job.getId(), subscription);
+        DataConsumer consumer = createConsumer(job);
+        consumer.start(topicListener.getOutput().asFlux());
+        consumers.put(job.getType().getId(), job.getId(), consumer);
     }
 
     public synchronized void removeJob(Job job) {
-        removeJob(job, kafkaConsumers);
-        removeJob(job, dmaapConsumers);
+        removeJob(job, dataConsumers);
     }
 
-    private static void removeJob(Job job, MultiMap<JobDataConsumer> consumers) {
-        JobDataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
+    private static void removeJob(Job job, MultiMap<DataConsumer> consumers) {
+        DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
         if (consumer != null) {
             logger.debug("Job removed {}", job.getId());
             consumer.stop();
@@ -119,25 +125,23 @@ public class TopicListeners {
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
     public synchronized void restartNonRunningKafkaTopics() {
-        for (String typeId : this.kafkaConsumers.keySet()) {
-            for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) {
-                if (!consumer.isRunning()) {
-                    restartTopicAndConsumers(this.kafkaTopicListeners, this.kafkaConsumers, consumer);
-                }
+        for (DataConsumer consumer : this.dataConsumers.values()) {
+            if (!consumer.isRunning()) {
+                restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer);
             }
         }
+
     }
 
     private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
-            MultiMap<JobDataConsumer> consumers, JobDataConsumer consumer) {
+            MultiMap<DataConsumer> consumers, DataConsumer consumer) {
         InfoType type = consumer.getJob().getType();
         TopicListener topic = topicListeners.get(type.getId());
         topic.start();
         restartConsumersOfType(consumers, topic, type);
     }
 
-    private static void restartConsumersOfType(MultiMap<JobDataConsumer> consumers, TopicListener topic,
-            InfoType type) {
+    private static void restartConsumersOfType(MultiMap<DataConsumer> consumers, TopicListener topic, InfoType type) {
         consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
     }
 }
index 99b8647..be2829f 100644 (file)
@@ -17,6 +17,9 @@
       "type": "integer",
       "minimum": 1
     },
+    "kafkaOutputTopic" : {
+      "type": "string"
+    },
     "bufferTimeout": {
       "type": "object",
       "properties": {
@@ -38,4 +41,4 @@
     }
   },
   "additionalProperties": false
-}
\ No newline at end of file
+}
index 84d21f8..10c7662 100644 (file)
@@ -61,6 +61,9 @@
          "type": "integer",
          "minimum": 1
       },
+      "kafkaOutputTopic" : {
+         "type": "string"
+      },
       "bufferTimeout": {
          "type": "object",
          "additionalProperties": false,
@@ -81,4 +84,4 @@
          ]
       }
    }
-}
\ No newline at end of file
+}
index 6c6ceda..c4b5ece 100644 (file)
@@ -40,7 +40,6 @@ import org.json.JSONObject;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
@@ -55,8 +54,9 @@ import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.repository.filters.PmReport;
 import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.JobDataConsumer;
+import org.oran.dmaapadapter.tasks.DataConsumer;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
+import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -70,14 +70,12 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-@ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
@@ -283,18 +281,18 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job
-        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
+        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
         String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
         ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, "");
 
         this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        JobDataConsumer kafkaConsumer = this.topicListeners.getKafkaConsumers().get(TYPE_ID, JOB_ID);
+        DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID);
 
         // Handle received data from Kafka, check that it has been posted to the
         // consumer
-        kafkaConsumer.start(Flux.just("data"));
+        kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
@@ -318,7 +316,7 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job
-        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
+        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
         ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -348,7 +346,7 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job
-        Job.Parameters param = new Job.Parameters(null, null, null, 1);
+        Job.Parameters param = new Job.Parameters(null, null, null, 1, null);
         ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -386,7 +384,7 @@ class ApplicationTest {
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
         Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
-                new Job.BufferTimeout(123, 456), null);
+                new Job.BufferTimeout(123, 456), null, null);
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
 
@@ -420,7 +418,7 @@ class ApplicationTest {
         // Create a job with a PM filter
         String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" //
                 + ".";
-        Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null);
+        Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null);
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson));
 
@@ -517,7 +515,7 @@ class ApplicationTest {
         // Create a job with a PM filter
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.getMeasTypes().add("succImmediateAssignProcs");
-        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
+        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null);
         String paramJson = gson.toJson(param);
 
         ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson));
index 603cea7..287c20b 100644 (file)
@@ -234,7 +234,7 @@ class IntegrationWithIcs {
         final String TYPE_ID = "KafkaInformationType";
 
         Job.Parameters param =
-                new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1);
+                new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null);
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
@@ -253,8 +253,8 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param =
-                new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 170 * 1000), 1);
+        Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE,
+                new Job.BufferTimeout(123, 170 * 1000), 1, null);
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
index 7bbf26c..330eb6b 100644 (file)
@@ -27,28 +27,30 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.gson.JsonParser;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.JobDataConsumer;
+import org.oran.dmaapadapter.tasks.DataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicListener;
+import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,15 +63,14 @@ import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
 
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.kafka.sender.KafkaSender;
 import reactor.kafka.sender.SenderOptions;
 import reactor.kafka.sender.SenderRecord;
 
 @SuppressWarnings("java:S3577") // Rename class
-@ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
@@ -103,7 +104,7 @@ class IntegrationWithKafka {
 
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
-    private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+    private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
 
     @LocalServerPort
     int localServerHttpPort;
@@ -187,8 +188,8 @@ class IntegrationWithKafka {
 
     private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
             int maxConcurrency) {
-        Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE,
-                new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
+        Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
+        Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null);
         String str = gson.toJson(param);
         return jsonObject(str);
     }
@@ -212,27 +213,42 @@ class IntegrationWithKafka {
         }
     }
 
-    private SenderOptions<Integer, String> senderOptions() {
+    ConsumerJobInfo consumerJobInfoKafka(String topic) {
+        try {
+            Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
+            String str = gson.toJson(param);
+            Object parametersObj = jsonObject(str);
+
+            return new ConsumerJobInfo(TYPE_ID, parametersObj, "owner", null, "");
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private SenderOptions<String, String> senderOptions() {
         String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<Integer, String, Integer> senderRecord(String data) {
+    private SenderRecord<String, String, Integer> senderRecord(String data) {
+        return senderRecord(data, "");
+    }
+
+    private SenderRecord<String, String, Integer> senderRecord(String data, String key) {
         final InfoType infoType = this.types.get(TYPE_ID);
-        int key = 1;
         int correlationMetadata = 2;
         return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
     }
 
-    private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
-        final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+    private void sendDataToStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
+        final KafkaSender<String, String> sender = KafkaSender.create(senderOptions());
 
         sender.send(dataToSend) //
                 .doOnError(e -> logger.error("Send failed", e)) //
@@ -266,7 +282,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         sleep(4000);
-        var dataToSend = Flux.just(senderRecord("Message"));
+        var dataToSend = Flux.just(senderRecord("Message", ""));
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message");
@@ -274,7 +290,49 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+    }
+
+    TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+
+    @Test
+    void sendToKafkaConsumer() throws ServiceException, InterruptedException {
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        final String OUTPUT_TOPIC = "outputTopic";
+
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Create a listener to the output topic. The KafkaTopicListener happens to be
+        // suitable for that,
+        InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
+        KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type);
+        receiver.start();
+
+        Disposable disponsable = receiver.getOutput().asFlux() //
+                .doOnNext(output -> {
+                    receivedKafkaOutput = output;
+                    logger.info("*** recived {}, {}", OUTPUT_TOPIC, output);
+                }) //
+                .doFinally(sig -> logger.info("Finally " + sig)) //
+                .subscribe();
+
+        String sendString = "testData " + Instant.now();
+        String sendKey = "key " + Instant.now();
+        var dataToSend = Flux.just(senderRecord(sendString, sendKey));
+        sleep(4000);
+        sendDataToStream(dataToSend);
+
+        await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString));
+        assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey);
+
+        disponsable.dispose();
+        receiver.stop();
     }
 
     @Test
@@ -304,7 +362,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
     }
 
     @Test
@@ -326,7 +384,7 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend); // this should overflow
 
-        JobDataConsumer consumer = topicListeners.getKafkaConsumers().get(TYPE_ID).iterator().next();
+        DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next();
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
@@ -344,7 +402,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
     }
 
 }