Code and documentation updated.
Change-Id: I04d4ef56d8a4953e32e0a1ac71b19449bc4aeb68
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-768
Introduction
************
-This is a generic information producer using the Information Coordination Service (ICS) Data Producer API. It can get information from DMaaP (ONAP) or directly from Kafka topics and deliver the
-information to data consumers using REST calls (POST).
+This is a generic information producer using the Information Coordination Service (ICS) Data Producer API. It can get information from DMaaP (ONAP) or directly from Kafka topics.
+The information can be filtered, transformed, aggregated and then delivered to data consumers using REST calls (POST) or via Kafka.
The DMaaP Adapter registers itself as an information producer along with its information types in Information Coordination Service (ICS).
The information types are defined in a configuration file.
.. image:: ./Architecture.png
:width: 500pt
+*************
+Data Delivery
+*************
+When a data consumer creates a an Information Job, either a URL for REST callbacks, or a Kafka Topic can be given as output for the job.
+After filtering, aggregation and data transformation the data will be delivered to the output. Several data consumers can receive data from one Kafka Topic.
+
+.. image:: ./DataDelivery.png
+ :width: 500pt
+
+The output will be the same regardless if the information is received from DMaaP of from Kafka. If the data is not buffered/aggregated,
+and the output is a Kafka Stream, both the keys and the values are forwarded (after filtering/transformation).
+If the output is HTTP,only the the values are forwarded (in the HTTP body).
+
+****************
+Data Aggregation
+****************
+When an Information Job is created, a bufferTimeout can be defined for aggregation of information.
+If this feature is used, the subscribed data will be buffered and will be delivered in chunks.
+
+The data will then be wrapped in a JSON array in a manner similar to DMaaP. The type configuration can define if the received data is Json.
+If not, each object is quoted (the output will then be an array of strings). If the data type is Json, the output will be an array of Json objects.
+
******************
Configuration File
******************
===============
This schema will by default be registerred for the type. The following properties are defined:
+* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka.
+
* filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
* regexp is for standard regexp matching of text. Objects that contains a match of the expression will be pushed to the consumer.
"ManagedElement=RNC-Gbg-1"
]
}
- }
\ No newline at end of file
+ }
private Integer maxConcurrency;
+ @Getter
+ private String kafkaOutputTopic;
+
public Parameters() {}
- public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency) {
+ public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency,
+ String kafkaOutputTopic) {
this.filter = filter;
this.bufferTimeout = bufferTimeout;
this.maxConcurrency = maxConcurrency;
this.filterType = filterType;
+ this.kafkaOutputTopic = kafkaOutputTopic;
}
public int getMaxConcurrency() {
this.parameters = parameters;
filter = createFilter(parameters);
this.consumerRestClient = consumerRestClient;
-
}
private static Filter createFilter(Parameters parameters) {
package org.oran.dmaapadapter.repository;
+import com.google.common.base.Strings;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
}
public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
- Parameters parameters) {
+ Parameters parameters) throws ServiceException {
+
+ if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) {
+ throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST);
+ }
AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
: restclientFactory.createRestClientNoHttpProxy(callbackUrl);
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class JobDataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(JobDataConsumer.class);
+public abstract class DataConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class);
@Getter
private final Job job;
private Disposable subscription;
}
}
- public JobDataConsumer(Job job) {
+ protected DataConsumer(Job job) {
this.job = job;
}
- public synchronized void start(Flux<String> input) {
+ public synchronized void start(Flux<TopicListener.Output> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
this.subscription = handleReceivedMessage(input, job) //
- .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
+ .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
this::handleExceptionInStream, //
- () -> logger.warn("JobDataConsumer stopped jobId: {}", job.getId()));
+ () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
}
private void handleExceptionInStream(Throwable t) {
- logger.warn("JobDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
stop();
}
- private Mono<String> postToClient(String body) {
- logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
- MediaType contentType =
- this.job.isBuffered() || this.job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
- return job.getConsumerRestClient().post("", body, contentType);
- }
+ protected abstract Mono<String> sendToClient(TopicListener.Output output);
public synchronized void stop() {
if (this.subscription != null) {
return this.subscription != null;
}
- private Flux<String> handleReceivedMessage(Flux<String> input, Job job) {
- Flux<String> result = input.map(job::filter) //
- .filter(t -> !t.isEmpty()); //
+ private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
+ Flux<TopicListener.Output> result =
+ inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
+ .filter(t -> !t.value.isEmpty()); //
if (job.isBuffered()) {
- result = result.map(str -> quoteNonJson(str, job)) //
+ result = result.map(input -> quoteNonJson(input.value, job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(Object::toString);
+ .map(buffered -> new TopicListener.Output("", buffered.toString()));
}
return result;
}
private final ApplicationConfig applicationConfig;
private final InfoType type;
private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
- private Many<String> output;
+ private Many<Output> output;
private Disposable topicReceiverTask;
public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
}
@Override
- public Many<String> getOutput() {
+ public Many<Output> getOutput() {
return this.output;
}
private void onReceivedData(String input) {
logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
- output.emitNext(input, Sinks.EmitFailureHandler.FAIL_FAST);
+ output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST);
}
private String getDmaapUrl() {
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import reactor.core.publisher.Mono;
+
+/**
+ * The class streams data from a multi cast sink and sends the data to the Job
+ * owner via REST calls.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class HttpDataConsumer extends DataConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class);
+
+ public HttpDataConsumer(Job job) {
+ super(job);
+ }
+
+ @Override
+ protected Mono<String> sendToClient(TopicListener.Output output) {
+ Job job = this.getJob();
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
+ MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
+ return job.getConsumerRestClient().post("", output.value, contentType);
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+/**
+ * The class streams data from a multi cast sink and sends the data to the Job
+ * owner via REST calls.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaDataConsumer extends DataConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class);
+
+ private KafkaSender<String, String> sender;
+ private final ApplicationConfig appConfig;
+
+ public KafkaDataConsumer(Job job, ApplicationConfig appConfig) {
+ super(job);
+ this.appConfig = appConfig;
+ }
+
+ @Override
+ protected Mono<String> sendToClient(TopicListener.Output data) {
+ Job job = this.getJob();
+
+ logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
+
+ SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
+
+ return this.sender.send(Mono.just(senderRecord)) //
+ .collectList() //
+ .map(x -> data.value);
+ }
+
+ @Override
+ public synchronized void start(Flux<TopicListener.Output> input) {
+ super.start(input);
+ SenderOptions<String, String> senderOptions = senderOptions(appConfig);
+ this.sender = KafkaSender.create(senderOptions);
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ if (sender != null) {
+ sender.close();
+ sender = null;
+ }
+ }
+
+ private static SenderOptions<String, String> senderOptions(ApplicationConfig config) {
+ String bootstrapServers = config.getKafkaBootStrapServers();
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return SenderOptions.create(props);
+ }
+
+ private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+ int correlationMetadata = 2;
+ String topic = infoJob.getParameters().getKafkaOutputTopic();
+ return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
+ }
+
+}
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private Many<String> output;
+ private Many<Output> output;
private Disposable topicReceiverTask;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
}
@Override
- public Many<String> getOutput() {
+ public Many<Output> getOutput() {
return this.output;
}
private void onReceivedData(ConsumerRecord<String, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
- output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+ output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST);
}
private void onReceivedError(Throwable t) {
package org.oran.dmaapadapter.tasks;
+import lombok.ToString;
import reactor.core.publisher.Sinks.Many;
public interface TopicListener {
+
+ @ToString
+ public static class Output {
+ public final String key;
+ public final String value;
+
+ public Output(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
public void start();
public void stop();
- public Many<String> getOutput();
+ public Many<Output> getOutput();
}
import lombok.Getter;
+import org.apache.logging.log4j.util.Strings;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
@Getter
- private final MultiMap<JobDataConsumer> kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId
- private final MultiMap<JobDataConsumer> dmaapConsumers = new MultiMap<>(); // Key is typeId, jobId
+ private final MultiMap<DataConsumer> dataConsumers = new MultiMap<>(); // Key is typeId, jobId
+
+ private final ApplicationConfig appConfig;
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
@Autowired SecurityContext securityContext) {
+ this.appConfig = appConfig;
for (InfoType type : types.getAll()) {
if (type.isKafkaTopicDefined()) {
removeJob(job);
logger.debug("Job added {}", job.getId());
if (job.getType().isKafkaTopicDefined()) {
- addJob(job, kafkaConsumers, kafkaTopicListeners);
+ addConsumer(job, dataConsumers, kafkaTopicListeners);
}
if (job.getType().isDmaapTopicDefined()) {
- addJob(job, dmaapConsumers, dmaapTopicListeners);
+ addConsumer(job, dataConsumers, dmaapTopicListeners);
}
}
- private static void addJob(Job job, MultiMap<JobDataConsumer> consumers,
- Map<String, TopicListener> topicListeners) {
+ private DataConsumer createConsumer(Job job) {
+ return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig)
+ : new HttpDataConsumer(job);
+ }
+
+ private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
if (consumers.get(job.getType().getId()).isEmpty()) {
topicListener.start();
}
- JobDataConsumer subscription = new JobDataConsumer(job);
- subscription.start(topicListener.getOutput().asFlux());
- consumers.put(job.getType().getId(), job.getId(), subscription);
+ DataConsumer consumer = createConsumer(job);
+ consumer.start(topicListener.getOutput().asFlux());
+ consumers.put(job.getType().getId(), job.getId(), consumer);
}
public synchronized void removeJob(Job job) {
- removeJob(job, kafkaConsumers);
- removeJob(job, dmaapConsumers);
+ removeJob(job, dataConsumers);
}
- private static void removeJob(Job job, MultiMap<JobDataConsumer> consumers) {
- JobDataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
+ private static void removeJob(Job job, MultiMap<DataConsumer> consumers) {
+ DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
if (consumer != null) {
logger.debug("Job removed {}", job.getId());
consumer.stop();
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
public synchronized void restartNonRunningKafkaTopics() {
- for (String typeId : this.kafkaConsumers.keySet()) {
- for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) {
- if (!consumer.isRunning()) {
- restartTopicAndConsumers(this.kafkaTopicListeners, this.kafkaConsumers, consumer);
- }
+ for (DataConsumer consumer : this.dataConsumers.values()) {
+ if (!consumer.isRunning()) {
+ restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer);
}
}
+
}
private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
- MultiMap<JobDataConsumer> consumers, JobDataConsumer consumer) {
+ MultiMap<DataConsumer> consumers, DataConsumer consumer) {
InfoType type = consumer.getJob().getType();
TopicListener topic = topicListeners.get(type.getId());
topic.start();
restartConsumersOfType(consumers, topic, type);
}
- private static void restartConsumersOfType(MultiMap<JobDataConsumer> consumers, TopicListener topic,
- InfoType type) {
+ private static void restartConsumersOfType(MultiMap<DataConsumer> consumers, TopicListener topic, InfoType type) {
consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}
"type": "integer",
"minimum": 1
},
+ "kafkaOutputTopic" : {
+ "type": "string"
+ },
"bufferTimeout": {
"type": "object",
"properties": {
}
},
"additionalProperties": false
-}
\ No newline at end of file
+}
"type": "integer",
"minimum": 1
},
+ "kafkaOutputTopic" : {
+ "type": "string"
+ },
"bufferTimeout": {
"type": "object",
"additionalProperties": false,
]
}
}
-}
\ No newline at end of file
+}
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.repository.filters.PmReport;
import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.JobDataConsumer;
+import org.oran.dmaapadapter.tasks.DataConsumer;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
+import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
waitForRegistration();
// Create a job
- Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
+ Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, "");
this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- JobDataConsumer kafkaConsumer = this.topicListeners.getKafkaConsumers().get(TYPE_ID, JOB_ID);
+ DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID);
// Handle received data from Kafka, check that it has been posted to the
// consumer
- kafkaConsumer.start(Flux.just("data"));
+ kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
waitForRegistration();
// Create a job
- Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
+ Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForRegistration();
// Create a job
- Job.Parameters param = new Job.Parameters(null, null, null, 1);
+ Job.Parameters param = new Job.Parameters(null, null, null, 1, null);
ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
- new Job.BufferTimeout(123, 456), null);
+ new Job.BufferTimeout(123, 456), null, null);
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
// Create a job with a PM filter
String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" //
+ ".";
- Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null);
+ Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null);
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson));
// Create a job with a PM filter
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.getMeasTypes().add("succImmediateAssignProcs");
- Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null);
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson));
final String TYPE_ID = "KafkaInformationType";
Job.Parameters param =
- new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1);
+ new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null);
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param =
- new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 170 * 1000), 1);
+ Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE,
+ new Job.BufferTimeout(123, 170 * 1000), 1, null);
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
import com.google.gson.JsonParser;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.JobDataConsumer;
+import org.oran.dmaapadapter.tasks.DataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicListener;
+import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
@SuppressWarnings("java:S3577") // Rename class
-@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
- private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+ private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
@LocalServerPort
int localServerHttpPort;
private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
int maxConcurrency) {
- Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE,
- new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
+ Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
+ Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null);
String str = gson.toJson(param);
return jsonObject(str);
}
}
}
- private SenderOptions<Integer, String> senderOptions() {
+ ConsumerJobInfo consumerJobInfoKafka(String topic) {
+ try {
+ Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
+ String str = gson.toJson(param);
+ Object parametersObj = jsonObject(str);
+
+ return new ConsumerJobInfo(TYPE_ID, parametersObj, "owner", null, "");
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private SenderOptions<String, String> senderOptions() {
String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return SenderOptions.create(props);
}
- private SenderRecord<Integer, String, Integer> senderRecord(String data) {
+ private SenderRecord<String, String, Integer> senderRecord(String data) {
+ return senderRecord(data, "");
+ }
+
+ private SenderRecord<String, String, Integer> senderRecord(String data, String key) {
final InfoType infoType = this.types.get(TYPE_ID);
- int key = 1;
int correlationMetadata = 2;
return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
}
- private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
- final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+ private void sendDataToStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
+ final KafkaSender<String, String> sender = KafkaSender.create(senderOptions());
sender.send(dataToSend) //
.doOnError(e -> logger.error("Send failed", e)) //
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
sleep(4000);
- var dataToSend = Flux.just(senderRecord("Message"));
+ var dataToSend = Flux.just(senderRecord("Message", ""));
sendDataToStream(dataToSend);
verifiedReceivedByConsumer("Message");
this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+ }
+
+ TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+
+ @Test
+ void sendToKafkaConsumer() throws ServiceException, InterruptedException {
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ final String OUTPUT_TOPIC = "outputTopic";
+
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Create a listener to the output topic. The KafkaTopicListener happens to be
+ // suitable for that,
+ InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
+ KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type);
+ receiver.start();
+
+ Disposable disponsable = receiver.getOutput().asFlux() //
+ .doOnNext(output -> {
+ receivedKafkaOutput = output;
+ logger.info("*** recived {}, {}", OUTPUT_TOPIC, output);
+ }) //
+ .doFinally(sig -> logger.info("Finally " + sig)) //
+ .subscribe();
+
+ String sendString = "testData " + Instant.now();
+ String sendKey = "key " + Instant.now();
+ var dataToSend = Flux.just(senderRecord(sendString, sendKey));
+ sleep(4000);
+ sendDataToStream(dataToSend);
+
+ await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString));
+ assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey);
+
+ disponsable.dispose();
+ receiver.stop();
}
@Test
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
}
@Test
var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend); // this should overflow
- JobDataConsumer consumer = topicListeners.getKafkaConsumers().get(TYPE_ID).iterator().next();
+ DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next();
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
}
}