var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+ verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+ // Just for testing quoting
+ this.consumerController.testResults.reset();
+ dataToSend = Flux.just(senderRecord("Message\"_", 1));
+ sendDataToStream(dataToSend);
+ verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
// Delete the jobs
this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
}
@Test
void kafkaIOverflow() throws InterruptedException {
- // This does not work. After an overflow, the kafka stream does not seem to work
- //
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
- sendDataToStream(dataToSend); // this will overflow
+ sendDataToStream(dataToSend); // this should overflow
- KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
kafkaTopicConsumers.restartNonRunningTasks();
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ Thread.sleep(1000); // Restarting the input seems to take some asynch time
- dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+ dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message__1", "Message__1");
+ verifiedReceivedByConsumer("Howdy_1");
}
}