+ @Test
+ void simpleCase() throws InterruptedException {
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ Thread.sleep(4000);
+ var dataToSend = Flux.just(senderRecord("Message"));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message");
+
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ }
+