Updated so that also info received from dmaap can be buffered into an array.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: I550a5b0bca4311e4ac4167219c660d58ab91dcf1
* kafkaInputTopic, a Kafka topic to listen to. Defaults to not listen to any topic.
* useHttpProxy, indicates if a HTTP proxy shall be used for data delivery (if configured). Defaults to false.
- This parameter is only relevant if a HTTPproxy is configured in the application.yaml file.
+ This parameter is only relevant if a HTTPproxy is configured in the application.yaml file.
* dataType, this can be set to "pmData" which gives a possibility to perform a special filtering of PM data.
When an information consumer creates an information job,it can provide type specific parameters. The allowed parameters are defined by a Json Schema.
The following schemas can be used by the component (are located in dmaapadapter/src/main/resources):
-====================
-typeSchemaDmaap.json
-====================
-This schema will be registered when dmaapTopicUrl is defined for the type. You can provide two parameters when creating the job which are
-used for filtering of the data.
+===============
+typeSchema.json
+===============
+This schema will by default be registerred for the type. The following properties are defined:
* filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
* jslt, which is an open source language for JSON processing. It can be used both for selecting matching json objects and for extracting or even transforming of json data. This is very powerful.
* filter, the value of the filter expression.
+* bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call. This contains:
+
+ * maxSize, the maximum number of objects to collect before delivery to the consumer
+ * maxTimeMiliseconds, the maximum time to delay delivery (to buffer).
+
+* maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher values may increase throughput.
+
Below follows examples of a filters.
"filter": "$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"
}
+Below follows an example of using bufferTimeout and maxConcurrency.
+.. code-block:: javascript
-==========================
-typeSchemaPmDataDmaap.json
-==========================
-This schema will be registered when dmaapTopicUrl is defined and the dataType is "pmData" for the type.
+ {
+ "bufferTimeout":{
+ "maxSize":123,
+ "maxTimeMiliseconds":456
+ },
+ "maxConcurrency":1
+ }
+
+
+
+=====================
+typeSchemaPmData.json
+=====================
+This schema will be registered when the configured dataType is "pmData".
This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to
-define which meas types to get from which resources.
+define which meas-types (counters) to get from which resources.
-The filterType parameter is extended to have value "pmdata" that can be used for PM data filtering.
+The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering.
* sourceNames an array of source names for wanted PM reports.
* measObjInstIds an array of meas object instances for wanted PM reports. If a the given filter value is contained in the filter definition, it will match (partial matching).
"ManagedElement=RNC-Gbg-1"
]
}
- }
-
-
-====================
-typeSchemaKafka.json
-====================
-This schema will be registered when kafkaInputTopic is defined for the type.
-
-* filterType, see above.
-* filter, see above.
-* bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. This contains:
-
- * maxSize, the maximum number of objects to collect before delivery to the consumer
- * maxTimeMiliseconds, the maximum time to delay delivery (to buffer).
-
-* maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher values may increase throughput.
-
-If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call.
-
-Below follows an example.
-
-.. code-block:: javascript
-
- {
- "bufferTimeout":{
- "maxSize":123,
- "maxTimeMiliseconds":456
- },
- "maxConcurrency":1
- }
-
-
-==========================
-typeSchemaPmDataKafka.json
-==========================
-This schema will be registered when kafkaInputTopic is defined and the dataType is "pmData" for the type.
-
-This schema will allow all parameters above.
-
-* filterType (one of: "regexp", "json-path", "jslt" or "pmdata")
-* filter, see above.
-* bufferTimeout, see above.
-
-
+ }
\ No newline at end of file
this.allJobs.remove(job.getId());
jobsByType.remove(job.getType().getId(), job.getId());
}
- synchronized (observers) {
- this.observers.forEach(obs -> obs.onJobRemoved(job));
- }
+ notifyJobRemoved(job);
+ }
+
+ private synchronized void notifyJobRemoved(Job job) {
+ this.observers.forEach(obs -> obs.onJobRemoved(job));
}
public synchronized int size() {
return jobsByType.get(type.getId());
}
- public synchronized void clear() {
- allJobs.clear();
- jobsByType.clear();
+ public void clear() {
+
+ this.allJobs.forEach((id, job) -> notifyJobRemoved(job));
+
+ synchronized (this) {
+ allJobs.clear();
+ jobsByType.clear();
+ }
}
}
package org.oran.dmaapadapter.repository;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
return this.map.keySet();
}
+ public Collection<T> values() {
+ ArrayList<T> result = new ArrayList<>();
+ for (String key : keySet()) {
+ result.addAll(get(key));
+ }
+ return result;
+ }
+
public void clear() {
this.map.clear();
}
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2021 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oran.dmaapadapter.tasks;
-
-import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.oran.dmaapadapter.repository.InfoType;
-import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Jobs;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DmaapTopicConsumers {
-
- DmaapTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
- // Start a consumer for each type
- for (InfoType type : types.getAll()) {
- if (type.isDmaapTopicDefined()) {
- DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
- topicConsumer.start();
- }
- }
- }
-
-}
import org.oran.dmaapadapter.repository.Jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.MediaType;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.util.function.Tuples;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
/**
* The class fetches incoming requests from DMAAP and sends them further to the
* consumers that has a job for this InformationType.
*/
-public class DmaapTopicConsumer {
- private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
- private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
+public class DmaapTopicListener {
+ private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(3);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapTopicListener.class);
private final AsyncRestClient dmaapRestClient;
protected final ApplicationConfig applicationConfig;
protected final InfoType type;
protected final Jobs jobs;
private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private Many<String> output;
+ private Disposable topicReceiverTask;
- public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
+ public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
this.applicationConfig = applicationConfig;
this.type = type;
this.jobs = jobs;
+
+ }
+
+ public Many<String> getOutput() {
+ return this.output;
}
public void start() {
- Flux.range(0, Integer.MAX_VALUE) //
+ stop();
+
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+ this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+ topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .flatMap(this::pushDataToConsumers) //
+ .doOnNext(this::onReceivedData) //
.subscribe(//
null, //
throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
this::onComplete); //
}
+ public void stop() {
+ if (topicReceiverTask != null) {
+ topicReceiverTask.dispose();
+ topicReceiverTask = null;
+ }
+ }
+
private void onComplete() {
logger.warn("DmaapMessageConsumer completed {}", type.getId());
start();
}
+ private void onReceivedData(String input) {
+ logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
+ output.emitNext(input, Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+
private String getDmaapUrl() {
return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
}
logger.trace("getFromMessageRouter {}", topicUrl);
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
- .flatMapMany(body -> toMessages(body)) //
+ .flatMapMany(this::splitArray) //
.doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
}
- private Flux<String> toMessages(String body) {
+ private Flux<String> splitArray(String body) {
Collection<String> messages = gson.fromJson(body, LinkedList.class);
return Flux.fromIterable(messages);
}
- private Mono<String> handleConsumerErrorResponse(Throwable t) {
- logger.warn("error from CONSUMER {}", t.getMessage());
- return Mono.empty();
- }
-
- protected Flux<String> pushDataToConsumers(String input) {
- logger.debug("Received data {}", input);
- final int CONCURRENCY = 50;
-
- // Distibute the body to all jobs for this type
- return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
- .map(job -> Tuples.of(job, job.filter(input))) //
- .filter(t -> !t.getT2().isEmpty()) //
- .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) //
- .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(),
- MediaType.APPLICATION_JSON), CONCURRENCY) //
- .onErrorResume(this::handleConsumerErrorResponse);
- }
-
}
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaJobDataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
+public class JobDataConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(JobDataConsumer.class);
@Getter
private final Job job;
private Disposable subscription;
private class ErrorStats {
private int consumerFaultCounter = 0;
- private boolean kafkaError = false; // eg. overflow
+ private boolean irrecoverableError = false; // eg. overflow
public void handleOkFromConsumer() {
this.consumerFaultCounter = 0;
if (t instanceof WebClientResponseException) {
++this.consumerFaultCounter;
} else {
- kafkaError = true;
+ irrecoverableError = true;
}
}
public boolean isItHopeless() {
final int STOP_AFTER_ERRORS = 5;
- return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+ return irrecoverableError || consumerFaultCounter > STOP_AFTER_ERRORS;
}
- public void resetKafkaErrors() {
- kafkaError = false;
+ public void resetIrrecoverableErrors() {
+ irrecoverableError = false;
}
}
- public KafkaJobDataConsumer(Job job) {
+ public JobDataConsumer(Job job) {
this.job = job;
}
public synchronized void start(Flux<String> input) {
stop();
- this.errorStats.resetKafkaErrors();
- this.subscription = handleMessagesFromKafka(input, job) //
+ this.errorStats.resetIrrecoverableErrors();
+ this.subscription = handleReceivedMessages(input, job) //
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
this::handleExceptionInStream, //
- () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+ () -> logger.warn("JobDataConsumer stopped jobId: {}", job.getId()));
}
private void handleExceptionInStream(Throwable t) {
- logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ logger.warn("JobDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
stop();
}
return this.subscription != null;
}
- private Flux<String> handleMessagesFromKafka(Flux<String> input, Job job) {
+ private Flux<String> handleReceivedMessages(Flux<String> input, Job job) {
Flux<String> result = input.map(job::filter) //
.filter(t -> !t.isEmpty()); //
() -> logger.warn("KafkaTopicReceiver stopped"));
}
- private void stop() {
+ public void stop() {
if (topicReceiverTask != null) {
topicReceiverTask.dispose();
topicReceiverTask = null;
private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
String schemaFile;
if (type.getDataType() == InfoType.DataType.PM_DATA) {
- schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaPmDataKafka.json" : "/typeSchemaPmDataDmaap.json";
+ schemaFile = "/typeSchemaPmData.json";
} else {
- schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+ schemaFile = "/typeSchema.json";
}
return jsonObject(readSchemaFile(schemaFile));
}
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@Component
@EnableScheduling
-public class KafkaTopicConsumers {
- private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
+public class TopicListeners {
+ private static final Logger logger = LoggerFactory.getLogger(TopicListeners.class);
- private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+ private final Map<String, KafkaTopicListener> kafkaTopicListeners = new HashMap<>(); // Key is typeId
+ private final Map<String, DmaapTopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
@Getter
- private final MultiMap<KafkaJobDataConsumer> consumers = new MultiMap<>(); // Key is typeId, jobId
+ private final MultiMap<JobDataConsumer> kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId
+ private final MultiMap<JobDataConsumer> dmaapConsumers = new MultiMap<>(); // Key is typeId, jobId
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
- public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types,
- @Autowired Jobs jobs) {
+ public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
for (InfoType type : types.getAll()) {
if (type.isKafkaTopicDefined()) {
KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
- topicListeners.put(type.getId(), topicConsumer);
+ kafkaTopicListeners.put(type.getId(), topicConsumer);
+ }
+ if (type.isDmaapTopicDefined()) {
+ DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, jobs);
+ dmaapTopicListeners.put(type.getId(), topicListener);
}
}
}
public synchronized void addJob(Job job) {
+ removeJob(job);
+ logger.debug("Job added {}", job.getId());
if (job.getType().isKafkaTopicDefined()) {
- removeJob(job);
- logger.debug("Kafka job added {}", job.getId());
- KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
- if (consumers.get(job.getType().getId()).isEmpty()) {
- topicConsumer.start();
+ KafkaTopicListener topicListener = kafkaTopicListeners.get(job.getType().getId());
+ if (kafkaConsumers.get(job.getType().getId()).isEmpty()) {
+ topicListener.start();
}
- KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
- subscription.start(topicConsumer.getOutput().asFlux());
- consumers.put(job.getType().getId(), job.getId(), subscription);
+ JobDataConsumer subscription = new JobDataConsumer(job);
+ subscription.start(topicListener.getOutput().asFlux());
+ kafkaConsumers.put(job.getType().getId(), job.getId(), subscription);
+ }
+
+ if (job.getType().isDmaapTopicDefined()) {
+ DmaapTopicListener topicListener = dmaapTopicListeners.get(job.getType().getId());
+ if (dmaapConsumers.get(job.getType().getId()).isEmpty()) {
+ topicListener.start();
+ }
+ JobDataConsumer subscription = new JobDataConsumer(job);
+ subscription.start(topicListener.getOutput().asFlux());
+ dmaapConsumers.put(job.getType().getId(), job.getId(), subscription);
}
}
public synchronized void removeJob(Job job) {
- KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId());
- if (d != null) {
+ JobDataConsumer consumer = kafkaConsumers.remove(job.getType().getId(), job.getId());
+ if (consumer != null) {
logger.debug("Kafka job removed {}", job.getId());
- d.stop();
+ consumer.stop();
+ }
+ consumer = this.dmaapConsumers.remove(job.getType().getId(), job.getId());
+ if (consumer != null) {
+ logger.debug("DMAAP job removed {}", job.getId());
+ consumer.stop();
}
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningTopics() {
- for (String typeId : this.consumers.keySet()) {
- for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+ public synchronized void restartNonRunningKafkaTopics() {
+ for (String typeId : this.kafkaConsumers.keySet()) {
+ for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) {
if (!consumer.isRunning()) {
- restartTopic(consumer);
+ restartKafkaTopic(consumer);
}
}
}
}
- private void restartTopic(KafkaJobDataConsumer consumer) {
+ private void restartKafkaTopic(JobDataConsumer consumer) {
InfoType type = consumer.getJob().getType();
- KafkaTopicListener topic = this.topicListeners.get(type.getId());
+ KafkaTopicListener topic = this.kafkaTopicListeners.get(type.getId());
topic.start();
restartConsumersOfType(topic, type);
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
+ this.kafkaConsumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}
+++ /dev/null
-{
- "$schema": "http://json-schema.org/draft-04/schema#",
- "type": "object",
- "properties": {
- "filter": {
- "type": "string"
- },
- "filterType": {
- "type": "string",
- "enum": [
- "jslt",
- "regexp",
- "json-path"
- ]
- }
- },
- "additionalProperties": false
-}
\ No newline at end of file
+++ /dev/null
-{
- "$schema": "http://json-schema.org/draft-04/schema#",
- "type": "object",
- "additionalProperties": false,
- "properties": {
- "filter": {
- "anyOf": [
- {
- "type": "string"
- },
- {
- "type": "object",
- "additionalProperties": false,
- "properties": {
- "sourceNames": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measObjInstIds": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measTypes": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measuredEntityDns": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- }
- }
- }
- ]
- },
- "filterType": {
- "type": "string",
- "enum": [
- "jslt",
- "regexp",
- "pmdata",
- "json-path"
- ]
- }
- }
-}
\ No newline at end of file
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
-import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.oran.dmaapadapter.tasks.JobDataConsumer;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
+import org.oran.dmaapadapter.tasks.TopicListeners;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
private IcsSimulatorController icsSimulatorController;
@Autowired
- KafkaTopicConsumers kafkaTopicConsumers;
+ TopicListeners topicListeners;
@Autowired
ProducerRegstrationTask producerRegistrationTask;
}
private String quote(String str) {
- return "\"" + str + "\"";
+ final String q = "\"";
+ return q + str.replace(q, "\\\"") + q;
}
private Object jsonObjectFilter(String filter, String filterType) {
this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+ JobDataConsumer kafkaConsumer = this.topicListeners.getKafkaConsumers().get(TYPE_ID, JOB_ID);
// Handle received data from Kafka, check that it has been posted to the
// consumer
// Test regular restart of stopped
kafkaConsumer.stop();
- this.kafkaTopicConsumers.restartNonRunningTopics();
+ this.topicListeners.restartNonRunningKafkaTopics();
await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
}
@Test
void testReceiveAndPostDataFromDmaap() throws Exception {
- final String JOB_ID = "ID";
+ final String JOB_ID = "testReceiveAndPostDataFromDmaap";
// Register producer, Register types
waitForRegistration();
// Create a job
- this.icsSimulatorController.addJob(consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()), JOB_ID,
- restClient());
+ Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// Return two messages from DMAAP and verify that these are sent to the owner of
DmaapSimulatorController.addResponse("DmaapResponse1");
DmaapSimulatorController.addResponse("DmaapResponse2");
ConsumerController.TestResults consumer = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ assertThat(consumer.receivedBodies.get(0)).contains("[\"DmaapResponse1");
+ assertThat(consumer.receivedBodies.get(0)).contains("DmaapResponse2\"]");
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
String jobs = restClient().get(jobUrl).block();
// Register producer, Register types
waitForRegistration();
- // Create a job with atestJsonPathFiltering JsonPath
+ // Create a job with JsonPath Filtering
ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
import java.util.LinkedList;
import java.util.List;
-import org.oran.dmaapadapter.controllers.ErrorResponse;
import org.oran.dmaapadapter.controllers.VoidResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
public static void addPmResponse(String response) {
- response = response.replace("\"", "\\\"");
- dmaapPmResponses.add("[\"" + response + "\"]");
+ dmaapPmResponses.add("[" + quote(response) + "]");
}
public static void addResponse(String response) {
- dmaapResponses.add("[\"" + response + "\"]");
+ dmaapResponses.add("[" + quote(response) + "]");
+ }
+
+ private static String quote(String str) {
+ final String q = "\"";
+ return q + str.replace(q, "\\\"") + q;
}
@GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponse(responseCode = "200", description = "OK", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
})
- public ResponseEntity<Object> getFromTopic() {
+ public ResponseEntity<Object> getFromTopic() throws InterruptedException {
if (dmaapResponses.isEmpty()) {
- return ErrorResponse.create("", HttpStatus.NOT_FOUND);
+ return nothing();
} else {
String resp = dmaapResponses.remove(0);
logger.info("DMAAP simulator returned: {}", resp);
@ApiResponse(responseCode = "200", description = "OK", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
})
- public ResponseEntity<Object> getFromPmTopic() {
+ public ResponseEntity<Object> getFromPmTopic() throws InterruptedException {
if (dmaapPmResponses.isEmpty()) {
- return ErrorResponse.create("", HttpStatus.NOT_FOUND);
+ return nothing();
} else {
String resp = dmaapPmResponses.remove(0);
return new ResponseEntity<>(resp, HttpStatus.OK);
}
}
+ @SuppressWarnings("java:S2925") // sleep
+ private ResponseEntity<Object> nothing() throws InterruptedException {
+ Thread.sleep(1000); // caller will retry immediately, make it take a rest
+ return new ResponseEntity<>("[]", HttpStatus.OK);
+ }
+
}
}
private String quote(String str) {
- return "\"" + str + "\"";
+ final String q = "\"";
+ return q + str.replace(q, "\\\"") + q;
}
private String reQuote(String str) {
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
-import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.oran.dmaapadapter.tasks.JobDataConsumer;
+import org.oran.dmaapadapter.tasks.TopicListeners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
private IcsSimulatorController icsSimulatorController;
@Autowired
- private KafkaTopicConsumers kafkaTopicConsumers;
+ private TopicListeners topicListeners;
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
}
@Test
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
}
@Test
var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend); // this should overflow
- KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().get(TYPE_ID).iterator().next();
+ JobDataConsumer consumer = topicListeners.getKafkaConsumers().get(TYPE_ID).iterator().next();
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
- kafkaTopicConsumers.restartNonRunningTopics();
+ topicListeners.restartNonRunningKafkaTopics();
sleep(1000); // Restarting the input seems to take some asynch time
dataToSend = Flux.just(senderRecord("Howdy\""));
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
}
}