PM Filter 43/8143/2
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 4 May 2022 12:30:28 +0000 (14:30 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 4 May 2022 13:29:46 +0000 (15:29 +0200)
Updated so that also info received from dmaap can be buffered into an array.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: I550a5b0bca4311e4ac4167219c660d58ab91dcf1

17 files changed:
docs/overview.rst
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/repository/MultiMap.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java [deleted file]
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java [moved from src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java with 73% similarity]
src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java [moved from src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java with 84% similarity]
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java [moved from src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java with 52% similarity]
src/main/resources/typeSchema.json [moved from src/main/resources/typeSchemaKafka.json with 100% similarity]
src/main/resources/typeSchemaDmaap.json [deleted file]
src/main/resources/typeSchemaPmData.json [moved from src/main/resources/typeSchemaPmDataKafka.json with 100% similarity]
src/main/resources/typeSchemaPmDataDmaap.json [deleted file]
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index cca8242..212e6b6 100644 (file)
@@ -42,7 +42,7 @@ Each entry will be registered as a subscribe information type in ICS. The follow
 * kafkaInputTopic, a Kafka topic to listen to. Defaults to not listen to any topic.
 
 * useHttpProxy, indicates if a HTTP proxy shall be used for data delivery (if configured). Defaults to false.
-  This parameter is only relevant if a HTTPproxy is configured in the application.yaml file. 
+  This parameter is only relevant if a HTTPproxy is configured in the application.yaml file.
 
 * dataType, this can be set to "pmData" which gives a possibility to perform a special filtering of PM data.
 
@@ -78,11 +78,10 @@ Information Job Parameters
 When an information consumer creates an information job,it can provide type specific parameters. The allowed parameters are defined by a Json Schema.
 The following schemas can be used by the component (are located in dmaapadapter/src/main/resources):
 
-====================
-typeSchemaDmaap.json
-====================
-This schema will be registered when dmaapTopicUrl is defined for the type. You can provide two parameters when creating the job which are
-used for filtering of the data.
+===============
+typeSchema.json
+===============
+This schema will by default be registerred for the type. The following properties are defined:
 
 * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
 
@@ -91,6 +90,13 @@ used for filtering of the data.
   * jslt, which is an open source language for JSON processing. It can be used both for selecting matching json objects and for extracting or even transforming of json data. This is very powerful.
 
 * filter, the value of the filter expression.
+* bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call. This contains:
+
+  * maxSize, the maximum number of objects to collect before delivery to the consumer
+  * maxTimeMiliseconds, the maximum time to delay delivery (to buffer).
+
+* maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher values may increase throughput.
+
 
 Below follows examples of a filters.
 
@@ -117,16 +123,28 @@ Below follows examples of a filters.
       "filter": "$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"
     }
 
+Below follows an example of using bufferTimeout and maxConcurrency.
 
+.. code-block:: javascript
 
-==========================
-typeSchemaPmDataDmaap.json
-==========================
-This schema will be registered when dmaapTopicUrl is defined and the dataType is "pmData" for the type.
+    {
+       "bufferTimeout":{
+          "maxSize":123,
+          "maxTimeMiliseconds":456
+       },
+       "maxConcurrency":1
+    }
+
+
+
+=====================
+typeSchemaPmData.json
+=====================
+This schema will be registered when the configured dataType is "pmData".
 This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to
-define which meas types to get from which resources.
+define which meas-types (counters) to get from which resources.
 
-The filterType parameter is extended to have value "pmdata" that can be used for PM data filtering. 
+The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering.
 
 * sourceNames an array of source names for wanted PM reports.
 * measObjInstIds an array of meas object instances for wanted PM reports. If a the given filter value is contained in the filter definition, it will match (partial matching).
@@ -157,47 +175,4 @@ Below follows an example on a PM filter.
            "ManagedElement=RNC-Gbg-1"
         ]
       }
-    }
-
-
-====================
-typeSchemaKafka.json
-====================
-This schema will be registered when kafkaInputTopic is defined for the type.
-
-* filterType, see above.
-* filter, see above.
-* bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. This contains:
-
-  * maxSize, the maximum number of objects to collect before delivery to the consumer
-  * maxTimeMiliseconds, the maximum time to delay delivery (to buffer).
-
-* maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher values may increase throughput. 
-
-If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call.
-
-Below follows an example.
-
-.. code-block:: javascript
-
-    {
-       "bufferTimeout":{
-          "maxSize":123,
-          "maxTimeMiliseconds":456
-       },
-       "maxConcurrency":1
-    }
-
-
-==========================
-typeSchemaPmDataKafka.json
-==========================
-This schema will be registered when kafkaInputTopic is defined and the dataType is "pmData" for the type.
-
-This schema will allow all parameters above.
-
-* filterType (one of: "regexp", "json-path", "jslt" or "pmdata")
-* filter, see above.
-* bufferTimeout, see above.
-
-
+    }
\ No newline at end of file
index ec33774..be83296 100644 (file)
@@ -110,9 +110,11 @@ public class Jobs {
             this.allJobs.remove(job.getId());
             jobsByType.remove(job.getType().getId(), job.getId());
         }
-        synchronized (observers) {
-            this.observers.forEach(obs -> obs.onJobRemoved(job));
-        }
+        notifyJobRemoved(job);
+    }
+
+    private synchronized void notifyJobRemoved(Job job) {
+        this.observers.forEach(obs -> obs.onJobRemoved(job));
     }
 
     public synchronized int size() {
@@ -123,8 +125,13 @@ public class Jobs {
         return jobsByType.get(type.getId());
     }
 
-    public synchronized void clear() {
-        allJobs.clear();
-        jobsByType.clear();
+    public void clear() {
+
+        this.allJobs.forEach((id, job) -> notifyJobRemoved(job));
+
+        synchronized (this) {
+            allJobs.clear();
+            jobsByType.clear();
+        }
     }
 }
index f7cc14e..82e8a08 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oran.dmaapadapter.repository;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -71,6 +72,14 @@ public class MultiMap<T> {
         return this.map.keySet();
     }
 
+    public Collection<T> values() {
+        ArrayList<T> result = new ArrayList<>();
+        for (String key : keySet()) {
+            result.addAll(get(key));
+        }
+        return result;
+    }
+
     public void clear() {
         this.map.clear();
     }
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java
deleted file mode 100644 (file)
index 9447c3a..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2021 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oran.dmaapadapter.tasks;
-
-import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.oran.dmaapadapter.repository.InfoType;
-import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Jobs;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DmaapTopicConsumers {
-
-    DmaapTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
-        // Start a consumer for each type
-        for (InfoType type : types.getAll()) {
-            if (type.isDmaapTopicDefined()) {
-                DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
-                topicConsumer.start();
-            }
-        }
-    }
-
-}
@@ -31,49 +31,74 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.MediaType;
 
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuples;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
 
 /**
  * The class fetches incoming requests from DMAAP and sends them further to the
  * consumers that has a job for this InformationType.
  */
-public class DmaapTopicConsumer {
-    private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
-    private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
+public class DmaapTopicListener {
+    private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(3);
+    private static final Logger logger = LoggerFactory.getLogger(DmaapTopicListener.class);
 
     private final AsyncRestClient dmaapRestClient;
     protected final ApplicationConfig applicationConfig;
     protected final InfoType type;
     protected final Jobs jobs;
     private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private Many<String> output;
+    private Disposable topicReceiverTask;
 
-    public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
+    public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
         this.applicationConfig = applicationConfig;
         this.type = type;
         this.jobs = jobs;
+
+    }
+
+    public Many<String> getOutput() {
+        return this.output;
     }
 
     public void start() {
-        Flux.range(0, Integer.MAX_VALUE) //
+        stop();
+
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+        topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .flatMap(this::pushDataToConsumers) //
+                .doOnNext(this::onReceivedData) //
                 .subscribe(//
                         null, //
                         throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
                         this::onComplete); //
     }
 
+    public void stop() {
+        if (topicReceiverTask != null) {
+            topicReceiverTask.dispose();
+            topicReceiverTask = null;
+        }
+    }
+
     private void onComplete() {
         logger.warn("DmaapMessageConsumer completed {}", type.getId());
         start();
     }
 
+    private void onReceivedData(String input) {
+        logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
+        output.emitNext(input, Sinks.EmitFailureHandler.FAIL_FAST);
+    }
+
     private String getDmaapUrl() {
         return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
     }
@@ -88,33 +113,14 @@ public class DmaapTopicConsumer {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
-                .flatMapMany(body -> toMessages(body)) //
+                .flatMapMany(this::splitArray) //
                 .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
-    private Flux<String> toMessages(String body) {
+    private Flux<String> splitArray(String body) {
         Collection<String> messages = gson.fromJson(body, LinkedList.class);
         return Flux.fromIterable(messages);
     }
 
-    private Mono<String> handleConsumerErrorResponse(Throwable t) {
-        logger.warn("error from CONSUMER {}", t.getMessage());
-        return Mono.empty();
-    }
-
-    protected Flux<String> pushDataToConsumers(String input) {
-        logger.debug("Received data {}", input);
-        final int CONCURRENCY = 50;
-
-        // Distibute the body to all jobs for this type
-        return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
-                .map(job -> Tuples.of(job, job.filter(input))) //
-                .filter(t -> !t.getT2().isEmpty()) //
-                .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) //
-                .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(),
-                        MediaType.APPLICATION_JSON), CONCURRENCY) //
-                .onErrorResume(this::handleConsumerErrorResponse);
-    }
-
 }
@@ -37,8 +37,8 @@ import reactor.core.publisher.Mono;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaJobDataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
+public class JobDataConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(JobDataConsumer.class);
     @Getter
     private final Job job;
     private Disposable subscription;
@@ -46,7 +46,7 @@ public class KafkaJobDataConsumer {
 
     private class ErrorStats {
         private int consumerFaultCounter = 0;
-        private boolean kafkaError = false; // eg. overflow
+        private boolean irrecoverableError = false; // eg. overflow
 
         public void handleOkFromConsumer() {
             this.consumerFaultCounter = 0;
@@ -56,37 +56,37 @@ public class KafkaJobDataConsumer {
             if (t instanceof WebClientResponseException) {
                 ++this.consumerFaultCounter;
             } else {
-                kafkaError = true;
+                irrecoverableError = true;
             }
         }
 
         public boolean isItHopeless() {
             final int STOP_AFTER_ERRORS = 5;
-            return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+            return irrecoverableError || consumerFaultCounter > STOP_AFTER_ERRORS;
         }
 
-        public void resetKafkaErrors() {
-            kafkaError = false;
+        public void resetIrrecoverableErrors() {
+            irrecoverableError = false;
         }
     }
 
-    public KafkaJobDataConsumer(Job job) {
+    public JobDataConsumer(Job job) {
         this.job = job;
     }
 
     public synchronized void start(Flux<String> input) {
         stop();
-        this.errorStats.resetKafkaErrors();
-        this.subscription = handleMessagesFromKafka(input, job) //
+        this.errorStats.resetIrrecoverableErrors();
+        this.subscription = handleReceivedMessages(input, job) //
                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
                         this::handleExceptionInStream, //
-                        () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+                        () -> logger.warn("JobDataConsumer stopped jobId: {}", job.getId()));
     }
 
     private void handleExceptionInStream(Throwable t) {
-        logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+        logger.warn("JobDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
         stop();
     }
 
@@ -107,7 +107,7 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> handleMessagesFromKafka(Flux<String> input, Job job) {
+    private Flux<String> handleReceivedMessages(Flux<String> input, Job job) {
         Flux<String> result = input.map(job::filter) //
                 .filter(t -> !t.isEmpty()); //
 
index f7c1c9d..f32c585 100644 (file)
@@ -72,7 +72,7 @@ public class KafkaTopicListener {
                         () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
-    private void stop() {
+    public void stop() {
         if (topicReceiverTask != null) {
             topicReceiverTask.dispose();
             topicReceiverTask = null;
index 6623bb8..24b53c9 100644 (file)
@@ -151,9 +151,9 @@ public class ProducerRegstrationTask {
     private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
         String schemaFile;
         if (type.getDataType() == InfoType.DataType.PM_DATA) {
-            schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaPmDataKafka.json" : "/typeSchemaPmDataDmaap.json";
+            schemaFile = "/typeSchemaPmData.json";
         } else {
-            schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+            schemaFile = "/typeSchema.json";
         }
         return jsonObject(readSchemaFile(schemaFile));
     }
@@ -41,23 +41,28 @@ import org.springframework.stereotype.Component;
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 @Component
 @EnableScheduling
-public class KafkaTopicConsumers {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
+public class TopicListeners {
+    private static final Logger logger = LoggerFactory.getLogger(TopicListeners.class);
 
-    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+    private final Map<String, KafkaTopicListener> kafkaTopicListeners = new HashMap<>(); // Key is typeId
+    private final Map<String, DmaapTopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
-    private final MultiMap<KafkaJobDataConsumer> consumers = new MultiMap<>(); // Key is typeId, jobId
+    private final MultiMap<JobDataConsumer> kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId
+    private final MultiMap<JobDataConsumer> dmaapConsumers = new MultiMap<>(); // Key is typeId, jobId
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
-    public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types,
-            @Autowired Jobs jobs) {
+    public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
 
         for (InfoType type : types.getAll()) {
             if (type.isKafkaTopicDefined()) {
                 KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
-                topicListeners.put(type.getId(), topicConsumer);
+                kafkaTopicListeners.put(type.getId(), topicConsumer);
+            }
+            if (type.isDmaapTopicDefined()) {
+                DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, jobs);
+                dmaapTopicListeners.put(type.getId(), topicListener);
             }
         }
 
@@ -75,46 +80,61 @@ public class KafkaTopicConsumers {
     }
 
     public synchronized void addJob(Job job) {
+        removeJob(job);
+        logger.debug("Job added {}", job.getId());
         if (job.getType().isKafkaTopicDefined()) {
-            removeJob(job);
-            logger.debug("Kafka job added {}", job.getId());
-            KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
-            if (consumers.get(job.getType().getId()).isEmpty()) {
-                topicConsumer.start();
+            KafkaTopicListener topicListener = kafkaTopicListeners.get(job.getType().getId());
+            if (kafkaConsumers.get(job.getType().getId()).isEmpty()) {
+                topicListener.start();
             }
-            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
-            subscription.start(topicConsumer.getOutput().asFlux());
-            consumers.put(job.getType().getId(), job.getId(), subscription);
+            JobDataConsumer subscription = new JobDataConsumer(job);
+            subscription.start(topicListener.getOutput().asFlux());
+            kafkaConsumers.put(job.getType().getId(), job.getId(), subscription);
+        }
+
+        if (job.getType().isDmaapTopicDefined()) {
+            DmaapTopicListener topicListener = dmaapTopicListeners.get(job.getType().getId());
+            if (dmaapConsumers.get(job.getType().getId()).isEmpty()) {
+                topicListener.start();
+            }
+            JobDataConsumer subscription = new JobDataConsumer(job);
+            subscription.start(topicListener.getOutput().asFlux());
+            dmaapConsumers.put(job.getType().getId(), job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId());
-        if (d != null) {
+        JobDataConsumer consumer = kafkaConsumers.remove(job.getType().getId(), job.getId());
+        if (consumer != null) {
             logger.debug("Kafka job removed {}", job.getId());
-            d.stop();
+            consumer.stop();
+        }
+        consumer = this.dmaapConsumers.remove(job.getType().getId(), job.getId());
+        if (consumer != null) {
+            logger.debug("DMAAP job removed {}", job.getId());
+            consumer.stop();
         }
     }
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
-    public synchronized void restartNonRunningTopics() {
-        for (String typeId : this.consumers.keySet()) {
-            for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+    public synchronized void restartNonRunningKafkaTopics() {
+        for (String typeId : this.kafkaConsumers.keySet()) {
+            for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) {
                 if (!consumer.isRunning()) {
-                    restartTopic(consumer);
+                    restartKafkaTopic(consumer);
                 }
             }
         }
     }
 
-    private void restartTopic(KafkaJobDataConsumer consumer) {
+    private void restartKafkaTopic(JobDataConsumer consumer) {
         InfoType type = consumer.getJob().getType();
-        KafkaTopicListener topic = this.topicListeners.get(type.getId());
+        KafkaTopicListener topic = this.kafkaTopicListeners.get(type.getId());
         topic.start();
         restartConsumersOfType(topic, type);
     }
 
     private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
-        this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
+        this.kafkaConsumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
     }
 }
diff --git a/src/main/resources/typeSchemaDmaap.json b/src/main/resources/typeSchemaDmaap.json
deleted file mode 100644 (file)
index 146b9eb..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-{
-  "$schema": "http://json-schema.org/draft-04/schema#",
-  "type": "object",
-  "properties": {
-    "filter": {
-      "type": "string"
-    },
-    "filterType": {
-      "type": "string",
-      "enum": [
-        "jslt",
-        "regexp",
-        "json-path"
-      ]
-    }
-  },
-  "additionalProperties": false
-}
\ No newline at end of file
diff --git a/src/main/resources/typeSchemaPmDataDmaap.json b/src/main/resources/typeSchemaPmDataDmaap.json
deleted file mode 100644 (file)
index 616cc02..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-{
-   "$schema": "http://json-schema.org/draft-04/schema#",
-   "type": "object",
-   "additionalProperties": false,
-   "properties": {
-      "filter": {
-         "anyOf": [
-            {
-               "type": "string"
-            },
-            {
-               "type": "object",
-               "additionalProperties": false,
-               "properties": {
-                  "sourceNames": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measObjInstIds": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measTypes": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measuredEntityDns": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  }
-               }
-            }
-         ]
-      },
-      "filterType": {
-         "type": "string",
-         "enum": [
-            "jslt",
-            "regexp",
-            "pmdata",
-            "json-path"
-         ]
-      }
-   }
-}
\ No newline at end of file
index 9b6ea58..8b91d0f 100644 (file)
@@ -53,9 +53,9 @@ import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
-import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.oran.dmaapadapter.tasks.JobDataConsumer;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
+import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -101,7 +101,7 @@ class ApplicationTest {
     private IcsSimulatorController icsSimulatorController;
 
     @Autowired
-    KafkaTopicConsumers kafkaTopicConsumers;
+    TopicListeners topicListeners;
 
     @Autowired
     ProducerRegstrationTask producerRegistrationTask;
@@ -203,7 +203,8 @@ class ApplicationTest {
     }
 
     private String quote(String str) {
-        return "\"" + str + "\"";
+        final String q = "\"";
+        return q + str.replace(q, "\\\"") + q;
     }
 
     private Object jsonObjectFilter(String filter, String filterType) {
@@ -284,7 +285,7 @@ class ApplicationTest {
         this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+        JobDataConsumer kafkaConsumer = this.topicListeners.getKafkaConsumers().get(TYPE_ID, JOB_ID);
 
         // Handle received data from Kafka, check that it has been posted to the
         // consumer
@@ -299,20 +300,21 @@ class ApplicationTest {
 
         // Test regular restart of stopped
         kafkaConsumer.stop();
-        this.kafkaTopicConsumers.restartNonRunningTopics();
+        this.topicListeners.restartNonRunningKafkaTopics();
         await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
     }
 
     @Test
     void testReceiveAndPostDataFromDmaap() throws Exception {
-        final String JOB_ID = "ID";
+        final String JOB_ID = "testReceiveAndPostDataFromDmaap";
 
         // Register producer, Register types
         waitForRegistration();
 
         // Create a job
-        this.icsSimulatorController.addJob(consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()), JOB_ID,
-                restClient());
+        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
@@ -320,8 +322,9 @@ class ApplicationTest {
         DmaapSimulatorController.addResponse("DmaapResponse1");
         DmaapSimulatorController.addResponse("DmaapResponse2");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
-        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+        assertThat(consumer.receivedBodies.get(0)).contains("[\"DmaapResponse1");
+        assertThat(consumer.receivedBodies.get(0)).contains("DmaapResponse2\"]");
 
         String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
         String jobs = restClient().get(jobUrl).block();
@@ -403,7 +406,7 @@ class ApplicationTest {
         // Register producer, Register types
         waitForRegistration();
 
-        // Create a job with atestJsonPathFiltering JsonPath
+        // Create a job with JsonPath Filtering
         ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
index 7d0c313..d2da047 100644 (file)
@@ -32,7 +32,6 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.oran.dmaapadapter.controllers.ErrorResponse;
 import org.oran.dmaapadapter.controllers.VoidResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,12 +55,16 @@ public class DmaapSimulatorController {
     private static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
 
     public static void addPmResponse(String response) {
-        response = response.replace("\"", "\\\"");
-        dmaapPmResponses.add("[\"" + response + "\"]");
+        dmaapPmResponses.add("[" + quote(response) + "]");
     }
 
     public static void addResponse(String response) {
-        dmaapResponses.add("[\"" + response + "\"]");
+        dmaapResponses.add("[" + quote(response) + "]");
+    }
+
+    private static String quote(String str) {
+        final String q = "\"";
+        return q + str.replace(q, "\\\"") + q;
     }
 
     @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@@ -71,9 +74,9 @@ public class DmaapSimulatorController {
             @ApiResponse(responseCode = "200", description = "OK", //
                     content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
     })
-    public ResponseEntity<Object> getFromTopic() {
+    public ResponseEntity<Object> getFromTopic() throws InterruptedException {
         if (dmaapResponses.isEmpty()) {
-            return ErrorResponse.create("", HttpStatus.NOT_FOUND);
+            return nothing();
         } else {
             String resp = dmaapResponses.remove(0);
             logger.info("DMAAP simulator returned: {}", resp);
@@ -89,13 +92,19 @@ public class DmaapSimulatorController {
             @ApiResponse(responseCode = "200", description = "OK", //
                     content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
     })
-    public ResponseEntity<Object> getFromPmTopic() {
+    public ResponseEntity<Object> getFromPmTopic() throws InterruptedException {
         if (dmaapPmResponses.isEmpty()) {
-            return ErrorResponse.create("", HttpStatus.NOT_FOUND);
+            return nothing();
         } else {
             String resp = dmaapPmResponses.remove(0);
             return new ResponseEntity<>(resp, HttpStatus.OK);
         }
     }
 
+    @SuppressWarnings("java:S2925") // sleep
+    private ResponseEntity<Object> nothing() throws InterruptedException {
+        Thread.sleep(1000); // caller will retry immediately, make it take a rest
+        return new ResponseEntity<>("[]", HttpStatus.OK);
+    }
+
 }
index 54a4940..5153443 100644 (file)
@@ -203,7 +203,8 @@ class IntegrationWithIcs {
     }
 
     private String quote(String str) {
-        return "\"" + str + "\"";
+        final String q = "\"";
+        return q + str.replace(q, "\\\"") + q;
     }
 
     private String reQuote(String str) {
index 3d24759..1e288e5 100644 (file)
@@ -49,8 +49,8 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
-import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.oran.dmaapadapter.tasks.JobDataConsumer;
+import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -97,7 +97,7 @@ class IntegrationWithKafka {
     private IcsSimulatorController icsSimulatorController;
 
     @Autowired
-    private KafkaTopicConsumers kafkaTopicConsumers;
+    private TopicListeners topicListeners;
 
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
@@ -273,7 +273,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
     }
 
     @Test
@@ -303,7 +303,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
     }
 
     @Test
@@ -325,12 +325,12 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend); // this should overflow
 
-        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().get(TYPE_ID).iterator().next();
+        JobDataConsumer consumer = topicListeners.getKafkaConsumers().get(TYPE_ID).iterator().next();
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
-        kafkaTopicConsumers.restartNonRunningTopics();
+        topicListeners.restartNonRunningKafkaTopics();
         sleep(1000); // Restarting the input seems to take some asynch time
 
         dataToSend = Flux.just(senderRecord("Howdy\""));
@@ -343,7 +343,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getKafkaConsumers().keySet()).isEmpty());
     }
 
 }