+ private SenderRecord<Integer, String, Integer> senderRecord(String data) {
+ final InfoType infoType = this.types.get(TYPE_ID);
+ int key = 1;
+ int correlationMetadata = 2;
+ return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
+ }
+
+ private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
+ final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+
+ sender.send(dataToSend) //
+ .doOnError(e -> logger.error("Send failed", e)) //
+ .blockLast();
+
+ sender.close();
+
+ }
+
+ private void verifiedReceivedByConsumer(String... strings) {
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(strings.length));
+ for (String s : strings) {
+ assertTrue(consumer.hasReceived(s));
+ }
+ }
+
+ @Test
+ void simpleCase() throws InterruptedException {
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ Thread.sleep(4000);
+ var dataToSend = Flux.just(senderRecord("Message"));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message");
+
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ }
+
+ @Test
+ void kafkaIntegrationTest() throws Exception {
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create two jobs. One buffering and one with a filter
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+ restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+ Thread.sleep(2000);
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+ // Delete the jobs
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());