+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+
+ ConsumerJobInfo jobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+ String body = gson.toJson(jobInfo);
+
+ restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block();
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ deleteInformationJobInEcs("KAFKA_JOB_ID");
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+