From: PatrikBuhr Date: Mon, 4 Jul 2022 13:28:05 +0000 (+0200) Subject: NONRTRIC - dmaap adapter characteristic improvement X-Git-Tag: 1.2.0~31 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F32%2F8732%2F1;p=nonrtric%2Fplt%2Fdmaapadapter.git NONRTRIC - dmaap adapter characteristic improvement Fixed issues with backpressure. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I5d9a1cb7c741110010e3dd116a5c115061fb59dd --- diff --git a/pom.xml b/pom.xml index 2b8d65b..1bfe947 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ org.springdoc springdoc-openapi-ui 1.6.3 + test org.springframework.boot @@ -372,4 +373,4 @@ JIRA https://jira.o-ran-sc.org/ - + \ No newline at end of file diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 3aa97fe..69226ca 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -30,11 +30,8 @@ import org.oran.dmaapadapter.repository.InfoType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Many; /** * The class fetches incoming requests from DMAAP and sends them further to the @@ -48,8 +45,7 @@ public class DmaapTopicListener implements TopicListener { private final ApplicationConfig applicationConfig; private final InfoType type; private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); - private Many output; - private Disposable topicReceiverTask; + private Flux output; public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) { AsyncRestClientFactory restclientFactory = @@ -60,42 +56,22 @@ public class DmaapTopicListener implements TopicListener { } @Override - public Many getOutput() { + public Flux getOutput() { + if (this.output == null) { + this.output = createOutput(); + } return this.output; } - @Override - public void start() { - stop(); - - final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; - this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); - - topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) // + private Flux createOutput() { + return Flux.range(0, Integer.MAX_VALUE) // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // - .doOnNext(this::onReceivedData) // - .subscribe(// - null, // - throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - this::onComplete); // - } - - @Override - public void stop() { - if (topicReceiverTask != null) { - topicReceiverTask.dispose(); - topicReceiverTask = null; - } - } - - private void onComplete() { - logger.warn("DmaapMessageConsumer completed {}", type.getId()); - start(); - } - - private void onReceivedData(String input) { - logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input); - output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST); + .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // + .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) // + .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // + .publish() // + .autoConnect() // + .map(input -> new Output("", input)); // } private String getDmaapUrl() { @@ -113,7 +89,7 @@ public class DmaapTopicListener implements TopicListener { return dmaapRestClient.get(topicUrl) // .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. .flatMapMany(this::splitJsonArray) // - .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // + .doOnNext(message -> logger.debug("Message from DMaaP topic: {} : {}", topicUrl, message)) // .onErrorResume(this::handleDmaapErrorResponse); // } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java index 2b0b7a4..406c6f3 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java @@ -87,7 +87,6 @@ public class KafkaDataConsumer extends DataConsumer { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 4a7f269..8d36fdd 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -25,16 +25,13 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.Disposable; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Many; +import reactor.core.publisher.Flux; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; @@ -47,8 +44,7 @@ public class KafkaTopicListener implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; - private Many output; - private Disposable topicReceiverTask; + private Flux output; public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; @@ -56,39 +52,24 @@ public class KafkaTopicListener implements TopicListener { } @Override - public Many getOutput() { + public Flux getOutput() { + if (this.output == null) { + this.output = createOutput(); + } return this.output; } - @Override - public void start() { - stop(); - final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; - this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); + private Flux createOutput() { logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId()); - topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) // + return KafkaReceiver.create(kafkaInputProperties()) // .receive() // - .doOnNext(this::onReceivedData) // - .subscribe(null, // - this::onReceivedError, // - () -> logger.warn("KafkaTopicReceiver stopped")); - } - - @Override - public void stop() { - if (topicReceiverTask != null) { - topicReceiverTask.dispose(); - topicReceiverTask = null; - } - } - - private void onReceivedData(ConsumerRecord input) { - logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); - output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST); - } - - private void onReceivedError(Throwable t) { - logger.error("KafkaTopicReceiver error: {}", t.getMessage()); + .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), + input.value())) // + .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // + .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // + .publish() // + .autoConnect() // + .map(input -> new Output(input.key(), input.value())); // } private ReceiverOptions kafkaInputProperties() { @@ -100,6 +81,7 @@ public class KafkaTopicListener implements TopicListener { consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index e32cfa5..54254a3 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -21,7 +21,7 @@ package org.oran.dmaapadapter.tasks; import lombok.ToString; -import reactor.core.publisher.Sinks.Many; +import reactor.core.publisher.Flux; public interface TopicListener { @@ -36,9 +36,5 @@ public interface TopicListener { } } - public void start(); - - public void stop(); - - public Many getOutput(); + public Flux getOutput(); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index df70b9f..6c0f48f 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally @@ -54,8 +53,6 @@ public class TopicListeners { private final ApplicationConfig appConfig; - private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; - public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs, @Autowired SecurityContext securityContext) { this.appConfig = appConfig; @@ -103,11 +100,8 @@ public class TopicListeners { private void addConsumer(Job job, MultiMap consumers, Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); - if (consumers.get(job.getType().getId()).isEmpty()) { - topicListener.start(); - } DataConsumer consumer = createConsumer(job); - consumer.start(topicListener.getOutput().asFlux()); + consumer.start(topicListener.getOutput()); consumers.put(job.getType().getId(), job.getId(), consumer); } @@ -123,25 +117,4 @@ public class TopicListeners { } } - @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) - public synchronized void restartNonRunningKafkaTopics() { - for (DataConsumer consumer : this.dataConsumers.values()) { - if (!consumer.isRunning()) { - restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer); - } - } - - } - - private static void restartTopicAndConsumers(Map topicListeners, - MultiMap consumers, DataConsumer consumer) { - InfoType type = consumer.getJob().getType(); - TopicListener topic = topicListeners.get(type.getId()); - topic.start(); - restartConsumersOfType(consumers, topic, type); - } - - private static void restartConsumersOfType(MultiMap consumers, TopicListener topic, InfoType type) { - consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); - } } diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index c4b5ece..a3febaf 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -157,15 +157,23 @@ class ApplicationTest { } @BeforeEach - void setPort() { + void init() { this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort); + assertThat(this.jobs.size()).isZero(); + assertThat(this.consumerController.testResults.receivedBodies).isEmpty(); + assertThat(this.consumerController.testResults.receivedHeaders).isEmpty(); } @AfterEach void reset() { + for (Job job : this.jobs.getAll()) { + this.icsSimulatorController.deleteJob(job.getId(), restClient()); + } + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset(); - this.jobs.clear(); + } private AsyncRestClient restClient(boolean useTrustValidation) { @@ -298,14 +306,6 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]"); assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); - - // Test send an exception - kafkaConsumer.start(Flux.error(new NullPointerException())); - - // Test regular restart of stopped - kafkaConsumer.stop(); - this.topicListeners.restartNonRunningKafkaTopics(); - await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue()); } @Test @@ -323,19 +323,15 @@ class ApplicationTest { // Return two messages from DMAAP and verify that these are sent to the owner of // the job (consumer) - DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]"); + DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]"); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]"); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]"); assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL; String jobs = restClient().get(jobUrl).block(); assertThat(jobs).contains(JOB_ID); - - // Delete the job - this.icsSimulatorController.deleteJob(JOB_ID, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } @Test @@ -353,16 +349,22 @@ class ApplicationTest { // Return two messages from DMAAP and verify that these are sent to the owner of // the job (consumer) - DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]"); + DmaapSimulatorController.addResponse("[\"DmaapResponse11\", \"DmaapResponse22\"]"); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); - assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2"); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11"); + assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22"); assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); // Delete the job this.icsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + + // Test that deleting the the last job did not terminate the DmaapTopicListener + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + DmaapSimulatorController.addResponse("[\"DmaapResponse77\", \"DmaapResponse88\"]"); + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4)); } static class PmReportArray extends ArrayList { @@ -462,18 +464,21 @@ class ApplicationTest { DmaapSimulatorController.addResponse("[\"Hello\"]"); ConsumerController.TestResults consumer = this.consumerController.testResults; - await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1)); + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); String received = consumer.receivedBodies.get(0); assertThat(received).isEqualTo("Hello"); - // This is the only time it is verified that mime type is plaintext when isJson - // is false and buffering is not used - assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); // Check that the auth token was received by the consumer assertThat(consumer.receivedHeaders).hasSize(1); Map headers = consumer.receivedHeaders.get(0); assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN); + + // This is the only time it is verified that mime type is plaintext when isJson + // is false and buffering is not used + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); + Files.delete(authFile); + this.securityContext.setAuthTokenFilePath(null); } @Test diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 287c20b..9c8f816 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -33,7 +33,6 @@ import java.nio.file.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; @@ -55,10 +54,8 @@ import org.springframework.boot.web.servlet.server.ServletWebServerFactory; import org.springframework.context.annotation.Bean; import org.springframework.http.HttpStatus; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringExtension; @SuppressWarnings("java:S3577") // Rename class -@ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 330eb6b..a345670 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -29,12 +29,14 @@ import com.google.gson.JsonParser; import java.time.Duration; import java.time.Instant; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; @@ -48,7 +50,6 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.DataConsumer; import org.oran.dmaapadapter.tasks.KafkaTopicListener; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; @@ -64,7 +65,6 @@ import org.springframework.boot.web.servlet.server.ServletWebServerFactory; import org.springframework.context.annotation.Bean; import org.springframework.test.context.TestPropertySource; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; @@ -151,11 +151,57 @@ class IntegrationWithKafka { } } + private static class KafkaReceiver { + public final static String OUTPUT_TOPIC = "outputTopic"; + private TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", ""); + private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); + + public KafkaReceiver(ApplicationConfig applicationConfig) { + + // Create a listener to the output topic. The KafkaTopicListener happens to be + // suitable for that, + InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false); + KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type); + + topicListener.getOutput() // + .doOnNext(this::set) // + .doFinally(sig -> logger.info("Finally " + sig)) // + .subscribe(); + } + + private void set(TopicListener.Output receivedKafkaOutput) { + this.receivedKafkaOutput = receivedKafkaOutput; + logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput); + } + + synchronized String lastKey() { + return this.receivedKafkaOutput.key; + } + + synchronized String lastValue() { + return this.receivedKafkaOutput.value; + } + } + + private static KafkaReceiver kafkaReceiver; + + @BeforeEach + void init() { + if (kafkaReceiver == null) { + kafkaReceiver = new KafkaReceiver(this.applicationConfig); + } + } + @AfterEach void reset() { + for (Job job : this.jobs.getAll()) { + this.icsSimulatorController.deleteJob(job.getId(), restClient()); + } + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); + this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset(); - this.jobs.clear(); } private AsyncRestClient restClient(boolean useTrustValidation) { @@ -265,9 +311,19 @@ class IntegrationWithKafka { } } + private void verifiedReceivedByConsumerLast(String s) { + ConsumerController.TestResults consumer = this.consumerController.testResults; + + await().untilAsserted(() -> assertThat(last(consumer.receivedBodies)).isEqualTo(s)); + } + + private String last(List l) { + return l.isEmpty() ? "" : l.get(l.size() - 1); + } + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. - private static void sleep(long millis) throws InterruptedException { - Thread.sleep(millis); + private static void waitForKafkaListener() throws InterruptedException { + Thread.sleep(4000); } @Test @@ -280,20 +336,35 @@ class IntegrationWithKafka { this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + waitForKafkaListener(); - sleep(4000); var dataToSend = Flux.just(senderRecord("Message", "")); sendDataToStream(dataToSend); verifiedReceivedByConsumer("Message"); + } - this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + @Test + void kafkaIntegrationTest() throws Exception { + final String JOB_ID1 = "ID1"; + final String JOB_ID2 = "ID2"; - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); - } + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + // Create two jobs. One buffering and one with a filter + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1, + restClient()); + this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + waitForKafkaListener(); + + var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. + sendDataToStream(dataToSend); - TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", ""); + verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); + } @Test void sendToKafkaConsumer() throws ServiceException, InterruptedException { @@ -303,70 +374,50 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - final String OUTPUT_TOPIC = "outputTopic"; - - this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - // Create a listener to the output topic. The KafkaTopicListener happens to be - // suitable for that, - InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false); - KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type); - receiver.start(); - - Disposable disponsable = receiver.getOutput().asFlux() // - .doOnNext(output -> { - receivedKafkaOutput = output; - logger.info("*** recived {}, {}", OUTPUT_TOPIC, output); - }) // - .doFinally(sig -> logger.info("Finally " + sig)) // - .subscribe(); + waitForKafkaListener(); String sendString = "testData " + Instant.now(); String sendKey = "key " + Instant.now(); var dataToSend = Flux.just(senderRecord(sendString, sendKey)); - sleep(4000); sendDataToStream(dataToSend); - await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString)); - assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey); - - disponsable.dispose(); - receiver.stop(); + await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString)); + assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey); } + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test - void kafkaIntegrationTest() throws Exception { - final String JOB_ID1 = "ID1"; - final String JOB_ID2 = "ID2"; + void kafkaCharacteristics() throws Exception { + final String JOB_ID = "kafkaCharacteristics"; // Register producer, Register types await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - // Create two jobs. One buffering and one with a filter - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1, - restClient()); - this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + waitForKafkaListener(); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + final int NO_OF_OBJECTS = 100000; - sleep(2000); - var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. - sendDataToStream(dataToSend); + Instant startTime = Instant.now(); - verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); + var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i)); // Message_1, etc. + sendDataToStream(dataToSend); - // Delete the jobs - this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); + while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) { + logger.info("sleeping {}", kafkaReceiver.lastValue()); + Thread.sleep(1000 * 1); + } - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); + final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); + logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); } @Test - void kafkaIOverflow() throws Exception { + void kafkaDeleteJobShouldNotStopListener() throws Exception { final String JOB_ID1 = "ID1"; final String JOB_ID2 = "ID2"; @@ -381,28 +432,20 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); - var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. - sendDataToStream(dataToSend); // this should overflow - - DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next(); - await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); - this.consumerController.testResults.reset(); - - this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job - topicListeners.restartNonRunningKafkaTopics(); - sleep(1000); // Restarting the input seems to take some asynch time + var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. + sendDataToStream(dataToSend); // this should not overflow - dataToSend = Flux.just(senderRecord("Howdy\"")); - sendDataToStream(dataToSend); - - verifiedReceivedByConsumer("[\"Howdy\\\"\"]"); - - // Delete the jobs + // Delete jobs, recreate one this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + dataToSend = Flux.just(senderRecord("Howdy")); + sendDataToStream(dataToSend); + + verifiedReceivedByConsumerLast("Howdy"); } }