NONRTRIC - dmaap adapter characteristic improvement 32/8732/1
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 4 Jul 2022 13:28:05 +0000 (15:28 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 5 Jul 2022 09:36:18 +0000 (11:36 +0200)
Fixed issues with backpressure.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I5d9a1cb7c741110010e3dd116a5c115061fb59dd

pom.xml
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

diff --git a/pom.xml b/pom.xml
index 2b8d65b..1bfe947 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
             <groupId>org.springdoc</groupId>
             <artifactId>springdoc-openapi-ui</artifactId>
             <version>1.6.3</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
+</project>
\ No newline at end of file
index 3aa97fe..69226ca 100644 (file)
@@ -30,11 +30,8 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
 
 /**
  * The class fetches incoming requests from DMAAP and sends them further to the
@@ -48,8 +45,7 @@ public class DmaapTopicListener implements TopicListener {
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
-    private Many<Output> output;
-    private Disposable topicReceiverTask;
+    private Flux<Output> output;
 
     public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
         AsyncRestClientFactory restclientFactory =
@@ -60,42 +56,22 @@ public class DmaapTopicListener implements TopicListener {
     }
 
     @Override
-    public Many<Output> getOutput() {
+    public Flux<Output> getOutput() {
+        if (this.output == null) {
+            this.output = createOutput();
+        }
         return this.output;
     }
 
-    @Override
-    public void start() {
-        stop();
-
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
-        topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) //
+    private Flux<Output> createOutput() {
+        return Flux.range(0, Integer.MAX_VALUE) //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .doOnNext(this::onReceivedData) //
-                .subscribe(//
-                        null, //
-                        throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                        this::onComplete); //
-    }
-
-    @Override
-    public void stop() {
-        if (topicReceiverTask != null) {
-            topicReceiverTask.dispose();
-            topicReceiverTask = null;
-        }
-    }
-
-    private void onComplete() {
-        logger.warn("DmaapMessageConsumer completed {}", type.getId());
-        start();
-    }
-
-    private void onReceivedData(String input) {
-        logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
-        output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST);
+                .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
+                .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
+                .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
+                .publish() //
+                .autoConnect() //
+                .map(input -> new Output("", input)); //
     }
 
     private String getDmaapUrl() {
@@ -113,7 +89,7 @@ public class DmaapTopicListener implements TopicListener {
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
                 .flatMapMany(this::splitJsonArray) //
-                .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+                .doOnNext(message -> logger.debug("Message from DMaaP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
index 2b0b7a4..406c6f3 100644 (file)
@@ -87,7 +87,6 @@ public class KafkaDataConsumer extends DataConsumer {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
index 4a7f269..8d36fdd 100644 (file)
@@ -25,16 +25,13 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.Disposable;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Flux;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
 
@@ -47,8 +44,7 @@ public class KafkaTopicListener implements TopicListener {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private Many<Output> output;
-    private Disposable topicReceiverTask;
+    private Flux<Output> output;
 
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
@@ -56,39 +52,24 @@ public class KafkaTopicListener implements TopicListener {
     }
 
     @Override
-    public Many<Output> getOutput() {
+    public Flux<Output> getOutput() {
+        if (this.output == null) {
+            this.output = createOutput();
+        }
         return this.output;
     }
 
-    @Override
-    public void start() {
-        stop();
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+    private Flux<Output> createOutput() {
         logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
-        topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
+        return KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
-                .doOnNext(this::onReceivedData) //
-                .subscribe(null, //
-                        this::onReceivedError, //
-                        () -> logger.warn("KafkaTopicReceiver stopped"));
-    }
-
-    @Override
-    public void stop() {
-        if (topicReceiverTask != null) {
-            topicReceiverTask.dispose();
-            topicReceiverTask = null;
-        }
-    }
-
-    private void onReceivedData(ConsumerRecord<String, String> input) {
-        logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
-        output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST);
-    }
-
-    private void onReceivedError(Throwable t) {
-        logger.error("KafkaTopicReceiver error: {}", t.getMessage());
+                .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
+                        input.value())) //
+                .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
+                .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+                .publish() //
+                .autoConnect() //
+                .map(input -> new Output(input.key(), input.value())); //
     }
 
     private ReceiverOptions<String, String> kafkaInputProperties() {
@@ -100,6 +81,7 @@ public class KafkaTopicListener implements TopicListener {
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter");
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 
         return ReceiverOptions.<String, String>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
index e32cfa5..54254a3 100644 (file)
@@ -21,7 +21,7 @@
 package org.oran.dmaapadapter.tasks;
 
 import lombok.ToString;
-import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Flux;
 
 public interface TopicListener {
 
@@ -36,9 +36,5 @@ public interface TopicListener {
         }
     }
 
-    public void start();
-
-    public void stop();
-
-    public Many<Output> getOutput();
+    public Flux<Output> getOutput();
 }
index df70b9f..6c0f48f 100644 (file)
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@@ -54,8 +53,6 @@ public class TopicListeners {
 
     private final ApplicationConfig appConfig;
 
-    private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
-
     public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
             @Autowired SecurityContext securityContext) {
         this.appConfig = appConfig;
@@ -103,11 +100,8 @@ public class TopicListeners {
 
     private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
-        if (consumers.get(job.getType().getId()).isEmpty()) {
-            topicListener.start();
-        }
         DataConsumer consumer = createConsumer(job);
-        consumer.start(topicListener.getOutput().asFlux());
+        consumer.start(topicListener.getOutput());
         consumers.put(job.getType().getId(), job.getId(), consumer);
     }
 
@@ -123,25 +117,4 @@ public class TopicListeners {
         }
     }
 
-    @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
-    public synchronized void restartNonRunningKafkaTopics() {
-        for (DataConsumer consumer : this.dataConsumers.values()) {
-            if (!consumer.isRunning()) {
-                restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer);
-            }
-        }
-
-    }
-
-    private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
-            MultiMap<DataConsumer> consumers, DataConsumer consumer) {
-        InfoType type = consumer.getJob().getType();
-        TopicListener topic = topicListeners.get(type.getId());
-        topic.start();
-        restartConsumersOfType(consumers, topic, type);
-    }
-
-    private static void restartConsumersOfType(MultiMap<DataConsumer> consumers, TopicListener topic, InfoType type) {
-        consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
-    }
 }
index c4b5ece..a3febaf 100644 (file)
@@ -157,15 +157,23 @@ class ApplicationTest {
     }
 
     @BeforeEach
-    void setPort() {
+    void init() {
         this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
+        assertThat(this.jobs.size()).isZero();
+        assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
+        assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
     }
 
     @AfterEach
     void reset() {
+        for (Job job : this.jobs.getAll()) {
+            this.icsSimulatorController.deleteJob(job.getId(), restClient());
+        }
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
-        this.jobs.clear();
+
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -298,14 +306,6 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
-
-        // Test send an exception
-        kafkaConsumer.start(Flux.error(new NullPointerException()));
-
-        // Test regular restart of stopped
-        kafkaConsumer.stop();
-        this.topicListeners.restartNonRunningKafkaTopics();
-        await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
     }
 
     @Test
@@ -323,19 +323,15 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
 
         String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
         String jobs = restClient().get(jobUrl).block();
         assertThat(jobs).contains(JOB_ID);
-
-        // Delete the job
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     @Test
@@ -353,16 +349,22 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse11\", \"DmaapResponse22\"]");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
-        assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11");
+        assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         // Delete the job
         this.icsSimulatorController.deleteJob(JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
+        // Test that deleting the the last job did not terminate the DmaapTopicListener
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        DmaapSimulatorController.addResponse("[\"DmaapResponse77\", \"DmaapResponse88\"]");
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4));
     }
 
     static class PmReportArray extends ArrayList<PmReport> {
@@ -462,18 +464,21 @@ class ApplicationTest {
         DmaapSimulatorController.addResponse("[\"Hello\"]");
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
-        await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         String received = consumer.receivedBodies.get(0);
         assertThat(received).isEqualTo("Hello");
-        // This is the only time it is verified that mime type is plaintext when isJson
-        // is false and buffering is not used
-        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         // Check that the auth token was received by the consumer
         assertThat(consumer.receivedHeaders).hasSize(1);
         Map<String, String> headers = consumer.receivedHeaders.get(0);
         assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
+
+        // This is the only time it is verified that mime type is plaintext when isJson
+        // is false and buffering is not used
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
+
         Files.delete(authFile);
+        this.securityContext.setAuthTokenFilePath(null);
     }
 
     @Test
index 287c20b..9c8f816 100644 (file)
@@ -33,7 +33,6 @@ import java.nio.file.Path;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
@@ -55,10 +54,8 @@ import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.http.HttpStatus;
 import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
 
 @SuppressWarnings("java:S3577") // Rename class
-@ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
index 330eb6b..a345670 100644 (file)
@@ -29,12 +29,14 @@ import com.google.gson.JsonParser;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
@@ -48,7 +50,6 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DataConsumer;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
@@ -64,7 +65,6 @@ import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
 
-import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.kafka.sender.KafkaSender;
 import reactor.kafka.sender.SenderOptions;
@@ -151,11 +151,57 @@ class IntegrationWithKafka {
         }
     }
 
+    private static class KafkaReceiver {
+        public final static String OUTPUT_TOPIC = "outputTopic";
+        private TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+        private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+        public KafkaReceiver(ApplicationConfig applicationConfig) {
+
+            // Create a listener to the output topic. The KafkaTopicListener happens to be
+            // suitable for that,
+            InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
+            KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+
+            topicListener.getOutput() //
+                    .doOnNext(this::set) //
+                    .doFinally(sig -> logger.info("Finally " + sig)) //
+                    .subscribe();
+        }
+
+        private void set(TopicListener.Output receivedKafkaOutput) {
+            this.receivedKafkaOutput = receivedKafkaOutput;
+            logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+        }
+
+        synchronized String lastKey() {
+            return this.receivedKafkaOutput.key;
+        }
+
+        synchronized String lastValue() {
+            return this.receivedKafkaOutput.value;
+        }
+    }
+
+    private static KafkaReceiver kafkaReceiver;
+
+    @BeforeEach
+    void init() {
+        if (kafkaReceiver == null) {
+            kafkaReceiver = new KafkaReceiver(this.applicationConfig);
+        }
+    }
+
     @AfterEach
     void reset() {
+        for (Job job : this.jobs.getAll()) {
+            this.icsSimulatorController.deleteJob(job.getId(), restClient());
+        }
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
-        this.jobs.clear();
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -265,9 +311,19 @@ class IntegrationWithKafka {
         }
     }
 
+    private void verifiedReceivedByConsumerLast(String s) {
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+
+        await().untilAsserted(() -> assertThat(last(consumer.receivedBodies)).isEqualTo(s));
+    }
+
+    private String last(List<String> l) {
+        return l.isEmpty() ? "" : l.get(l.size() - 1);
+    }
+
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
-    private static void sleep(long millis) throws InterruptedException {
-        Thread.sleep(millis);
+    private static void waitForKafkaListener() throws InterruptedException {
+        Thread.sleep(4000);
     }
 
     @Test
@@ -280,20 +336,35 @@ class IntegrationWithKafka {
 
         this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        waitForKafkaListener();
 
-        sleep(4000);
         var dataToSend = Flux.just(senderRecord("Message", ""));
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message");
+    }
 
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+    @Test
+    void kafkaIntegrationTest() throws Exception {
+        final String JOB_ID1 = "ID1";
+        final String JOB_ID2 = "ID2";
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
-    }
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        // Create two jobs. One buffering and one with a filter
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+                restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+        waitForKafkaListener();
+
+        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+        sendDataToStream(dataToSend);
 
-    TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+    }
 
     @Test
     void sendToKafkaConsumer() throws ServiceException, InterruptedException {
@@ -303,70 +374,50 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        final String OUTPUT_TOPIC = "outputTopic";
-
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
-        // Create a listener to the output topic. The KafkaTopicListener happens to be
-        // suitable for that,
-        InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
-        KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type);
-        receiver.start();
-
-        Disposable disponsable = receiver.getOutput().asFlux() //
-                .doOnNext(output -> {
-                    receivedKafkaOutput = output;
-                    logger.info("*** recived {}, {}", OUTPUT_TOPIC, output);
-                }) //
-                .doFinally(sig -> logger.info("Finally " + sig)) //
-                .subscribe();
+        waitForKafkaListener();
 
         String sendString = "testData " + Instant.now();
         String sendKey = "key " + Instant.now();
         var dataToSend = Flux.just(senderRecord(sendString, sendKey));
-        sleep(4000);
         sendDataToStream(dataToSend);
 
-        await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString));
-        assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey);
-
-        disponsable.dispose();
-        receiver.stop();
+        await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
+        assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
     }
 
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
-    void kafkaIntegrationTest() throws Exception {
-        final String JOB_ID1 = "ID1";
-        final String JOB_ID2 = "ID2";
+    void kafkaCharacteristics() throws Exception {
+        final String JOB_ID = "kafkaCharacteristics";
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
-                restClient());
-        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        waitForKafkaListener();
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+        final int NO_OF_OBJECTS = 100000;
 
-        sleep(2000);
-        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
-        sendDataToStream(dataToSend);
+        Instant startTime = Instant.now();
 
-        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i)); // Message_1, etc.
+        sendDataToStream(dataToSend);
 
-        // Delete the jobs
-        this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
-        this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
+        while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) {
+            logger.info("sleeping {}", kafkaReceiver.lastValue());
+            Thread.sleep(1000 * 1);
+        }
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
     }
 
     @Test
-    void kafkaIOverflow() throws Exception {
+    void kafkaDeleteJobShouldNotStopListener() throws Exception {
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
@@ -381,28 +432,20 @@ class IntegrationWithKafka {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
-        var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
-        sendDataToStream(dataToSend); // this should overflow
-
-        DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next();
-        await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
-        this.consumerController.testResults.reset();
-
-        this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
-        topicListeners.restartNonRunningKafkaTopics();
-        sleep(1000); // Restarting the input seems to take some asynch time
+        var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+        sendDataToStream(dataToSend); // this should not overflow
 
-        dataToSend = Flux.just(senderRecord("Howdy\""));
-        sendDataToStream(dataToSend);
-
-        verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
-
-        // Delete the jobs
+        // Delete jobs, recreate one
         this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
-
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        dataToSend = Flux.just(senderRecord("Howdy"));
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumerLast("Howdy");
     }
 
 }