+ void testReceiveAndPostDataFromKafka() {
+ final String JOB_ID = "ID";
+ final String TYPE_ID = "KafkaInformationType";
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create a job
+ Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo kafkaJobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+ // Handle received data from Kafka, check that it has been posted to the
+ // consumer
+ kafkaConsumer.start(Flux.just("data"));
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+ // Test send an exception
+ kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+ // Test regular restart of stopped
+ kafkaConsumer.stop();
+ this.kafkaTopicConsumers.restartNonRunningTopics();
+ await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ @Test
+ void testReceiveAndPostDataFromDmaap() throws Exception {