Fixed issues with backpressure.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I5d9a1cb7c741110010e3dd116a5c115061fb59dd
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>1.6.3</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
+</project>
\ No newline at end of file
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
/**
* The class fetches incoming requests from DMAAP and sends them further to the
private final ApplicationConfig applicationConfig;
private final InfoType type;
private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
- private Many<Output> output;
- private Disposable topicReceiverTask;
+ private Flux<Output> output;
public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
AsyncRestClientFactory restclientFactory =
}
@Override
- public Many<Output> getOutput() {
+ public Flux<Output> getOutput() {
+ if (this.output == null) {
+ this.output = createOutput();
+ }
return this.output;
}
- @Override
- public void start() {
- stop();
-
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
- topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) //
+ private Flux<Output> createOutput() {
+ return Flux.range(0, Integer.MAX_VALUE) //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .doOnNext(this::onReceivedData) //
- .subscribe(//
- null, //
- throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- this::onComplete); //
- }
-
- @Override
- public void stop() {
- if (topicReceiverTask != null) {
- topicReceiverTask.dispose();
- topicReceiverTask = null;
- }
- }
-
- private void onComplete() {
- logger.warn("DmaapMessageConsumer completed {}", type.getId());
- start();
- }
-
- private void onReceivedData(String input) {
- logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
- output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST);
+ .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
+ .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
+ .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
+ .publish() //
+ .autoConnect() //
+ .map(input -> new Output("", input)); //
}
private String getDmaapUrl() {
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.flatMapMany(this::splitJsonArray) //
- .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+ .doOnNext(message -> logger.debug("Message from DMaaP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
}
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.Disposable;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private Many<Output> output;
- private Disposable topicReceiverTask;
+ private Flux<Output> output;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
}
@Override
- public Many<Output> getOutput() {
+ public Flux<Output> getOutput() {
+ if (this.output == null) {
+ this.output = createOutput();
+ }
return this.output;
}
- @Override
- public void start() {
- stop();
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+ private Flux<Output> createOutput() {
logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
- topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
+ return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
- .doOnNext(this::onReceivedData) //
- .subscribe(null, //
- this::onReceivedError, //
- () -> logger.warn("KafkaTopicReceiver stopped"));
- }
-
- @Override
- public void stop() {
- if (topicReceiverTask != null) {
- topicReceiverTask.dispose();
- topicReceiverTask = null;
- }
- }
-
- private void onReceivedData(ConsumerRecord<String, String> input) {
- logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
- output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST);
- }
-
- private void onReceivedError(Throwable t) {
- logger.error("KafkaTopicReceiver error: {}", t.getMessage());
+ .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
+ input.value())) //
+ .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
+ .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+ .publish() //
+ .autoConnect() //
+ .map(input -> new Output(input.key(), input.value())); //
}
private ReceiverOptions<String, String> kafkaInputProperties() {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
package org.oran.dmaapadapter.tasks;
import lombok.ToString;
-import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Flux;
public interface TopicListener {
}
}
- public void start();
-
- public void stop();
-
- public Many<Output> getOutput();
+ public Flux<Output> getOutput();
}
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
private final ApplicationConfig appConfig;
- private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
-
public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
@Autowired SecurityContext securityContext) {
this.appConfig = appConfig;
private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
- if (consumers.get(job.getType().getId()).isEmpty()) {
- topicListener.start();
- }
DataConsumer consumer = createConsumer(job);
- consumer.start(topicListener.getOutput().asFlux());
+ consumer.start(topicListener.getOutput());
consumers.put(job.getType().getId(), job.getId(), consumer);
}
}
}
- @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningKafkaTopics() {
- for (DataConsumer consumer : this.dataConsumers.values()) {
- if (!consumer.isRunning()) {
- restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer);
- }
- }
-
- }
-
- private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
- MultiMap<DataConsumer> consumers, DataConsumer consumer) {
- InfoType type = consumer.getJob().getType();
- TopicListener topic = topicListeners.get(type.getId());
- topic.start();
- restartConsumersOfType(consumers, topic, type);
- }
-
- private static void restartConsumersOfType(MultiMap<DataConsumer> consumers, TopicListener topic, InfoType type) {
- consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
- }
}
}
@BeforeEach
- void setPort() {
+ void init() {
this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
+ assertThat(this.jobs.size()).isZero();
+ assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
+ assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
}
@AfterEach
void reset() {
+ for (Job job : this.jobs.getAll()) {
+ this.icsSimulatorController.deleteJob(job.getId(), restClient());
+ }
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
- this.jobs.clear();
+
}
private AsyncRestClient restClient(boolean useTrustValidation) {
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
-
- // Test send an exception
- kafkaConsumer.start(Flux.error(new NullPointerException()));
-
- // Test regular restart of stopped
- kafkaConsumer.stop();
- this.topicListeners.restartNonRunningKafkaTopics();
- await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
}
@Test
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
String jobs = restClient().get(jobUrl).block();
assertThat(jobs).contains(JOB_ID);
-
- // Delete the job
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ DmaapSimulatorController.addResponse("[\"DmaapResponse11\", \"DmaapResponse22\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
- assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11");
+ assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
// Delete the job
this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
+ // Test that deleting the the last job did not terminate the DmaapTopicListener
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ DmaapSimulatorController.addResponse("[\"DmaapResponse77\", \"DmaapResponse88\"]");
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4));
}
static class PmReportArray extends ArrayList<PmReport> {
DmaapSimulatorController.addResponse("[\"Hello\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
String received = consumer.receivedBodies.get(0);
assertThat(received).isEqualTo("Hello");
- // This is the only time it is verified that mime type is plaintext when isJson
- // is false and buffering is not used
- assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
// Check that the auth token was received by the consumer
assertThat(consumer.receivedHeaders).hasSize(1);
Map<String, String> headers = consumer.receivedHeaders.get(0);
assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
+
+ // This is the only time it is verified that mime type is plaintext when isJson
+ // is false and buffering is not used
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
+
Files.delete(authFile);
+ this.securityContext.setAuthTokenFilePath(null);
}
@Test
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
@SuppressWarnings("java:S3577") // Rename class
-@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DataConsumer;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.TestPropertySource;
-import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
}
}
+ private static class KafkaReceiver {
+ public final static String OUTPUT_TOPIC = "outputTopic";
+ private TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+ private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+ public KafkaReceiver(ApplicationConfig applicationConfig) {
+
+ // Create a listener to the output topic. The KafkaTopicListener happens to be
+ // suitable for that,
+ InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
+ KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+
+ topicListener.getOutput() //
+ .doOnNext(this::set) //
+ .doFinally(sig -> logger.info("Finally " + sig)) //
+ .subscribe();
+ }
+
+ private void set(TopicListener.Output receivedKafkaOutput) {
+ this.receivedKafkaOutput = receivedKafkaOutput;
+ logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+ }
+
+ synchronized String lastKey() {
+ return this.receivedKafkaOutput.key;
+ }
+
+ synchronized String lastValue() {
+ return this.receivedKafkaOutput.value;
+ }
+ }
+
+ private static KafkaReceiver kafkaReceiver;
+
+ @BeforeEach
+ void init() {
+ if (kafkaReceiver == null) {
+ kafkaReceiver = new KafkaReceiver(this.applicationConfig);
+ }
+ }
+
@AfterEach
void reset() {
+ for (Job job : this.jobs.getAll()) {
+ this.icsSimulatorController.deleteJob(job.getId(), restClient());
+ }
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
- this.jobs.clear();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
}
}
+ private void verifiedReceivedByConsumerLast(String s) {
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+
+ await().untilAsserted(() -> assertThat(last(consumer.receivedBodies)).isEqualTo(s));
+ }
+
+ private String last(List<String> l) {
+ return l.isEmpty() ? "" : l.get(l.size() - 1);
+ }
+
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
- private static void sleep(long millis) throws InterruptedException {
- Thread.sleep(millis);
+ private static void waitForKafkaListener() throws InterruptedException {
+ Thread.sleep(4000);
}
@Test
this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ waitForKafkaListener();
- sleep(4000);
var dataToSend = Flux.just(senderRecord("Message", ""));
sendDataToStream(dataToSend);
verifiedReceivedByConsumer("Message");
+ }
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ @Test
+ void kafkaIntegrationTest() throws Exception {
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
- }
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create two jobs. One buffering and one with a filter
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+ restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ waitForKafkaListener();
+
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend);
- TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+ verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+ }
@Test
void sendToKafkaConsumer() throws ServiceException, InterruptedException {
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- final String OUTPUT_TOPIC = "outputTopic";
-
- this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
- // Create a listener to the output topic. The KafkaTopicListener happens to be
- // suitable for that,
- InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
- KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type);
- receiver.start();
-
- Disposable disponsable = receiver.getOutput().asFlux() //
- .doOnNext(output -> {
- receivedKafkaOutput = output;
- logger.info("*** recived {}, {}", OUTPUT_TOPIC, output);
- }) //
- .doFinally(sig -> logger.info("Finally " + sig)) //
- .subscribe();
+ waitForKafkaListener();
String sendString = "testData " + Instant.now();
String sendKey = "key " + Instant.now();
var dataToSend = Flux.just(senderRecord(sendString, sendKey));
- sleep(4000);
sendDataToStream(dataToSend);
- await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString));
- assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey);
-
- disponsable.dispose();
- receiver.stop();
+ await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
+ assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
}
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
- void kafkaIntegrationTest() throws Exception {
- final String JOB_ID1 = "ID1";
- final String JOB_ID2 = "ID2";
+ void kafkaCharacteristics() throws Exception {
+ final String JOB_ID = "kafkaCharacteristics";
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- // Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
- restClient());
- this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ waitForKafkaListener();
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ final int NO_OF_OBJECTS = 100000;
- sleep(2000);
- var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
- sendDataToStream(dataToSend);
+ Instant startTime = Instant.now();
- verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i)); // Message_1, etc.
+ sendDataToStream(dataToSend);
- // Delete the jobs
- this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
+ while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) {
+ logger.info("sleeping {}", kafkaReceiver.lastValue());
+ Thread.sleep(1000 * 1);
+ }
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+ final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+ logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
}
@Test
- void kafkaIOverflow() throws Exception {
+ void kafkaDeleteJobShouldNotStopListener() throws Exception {
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
- sendDataToStream(dataToSend); // this should overflow
-
- DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next();
- await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
- this.consumerController.testResults.reset();
-
- this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
- topicListeners.restartNonRunningKafkaTopics();
- sleep(1000); // Restarting the input seems to take some asynch time
+ var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend); // this should not overflow
- dataToSend = Flux.just(senderRecord("Howdy\""));
- sendDataToStream(dataToSend);
-
- verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
-
- // Delete the jobs
+ // Delete jobs, recreate one
this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
-
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ dataToSend = Flux.just(senderRecord("Howdy"));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumerLast("Howdy");
}
}