+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+ }
+
+ @Test
+ void kafkaIOverflow() throws InterruptedException {
+ // This does not work. After an overflow, the kafka stream does not seem to work
+ //
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+ // Create two jobs.
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+ var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend); // this will overflow
+
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+ await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
+ this.consumerController.testResults.reset();
+
+ kafkaTopicConsumers.restartNonRunningTasks();
+
+ dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message__1", "Message__1");