- ConsumerController.TestResults consumer = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("Message_1");
- assertThat(consumer.receivedBodies.get(1)).isEqualTo("[Message_1, Message_2, Message_3]");
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ }
+
+ @Test
+ void kafkaIOverflow() throws InterruptedException {
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create two jobs.
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+ restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+ var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend); // this should overflow
+
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().get(TYPE_ID).iterator().next();
+ await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
+ this.consumerController.testResults.reset();
+
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ kafkaTopicConsumers.restartNonRunningTasks();
+ Thread.sleep(1000); // Restarting the input seems to take some asynch time
+
+ dataToSend = Flux.just(senderRecord("Howdy\""));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("[\"Howdy\\\"\"]");