From 86f81813c94a44337c199124e7bbf6280e2c6aa6 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 24 Nov 2021 14:22:54 +0100 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Added unittest simultaing data and execptions from Kafka Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: I7d13a4aa22ad77d76aa451bca47ac493c42dca0e --- dmaap-adaptor-java/config/application.yaml | 2 +- .../configuration/ApplicationConfig.java | 4 +- .../org/oran/dmaapadapter/repository/MultiMap.java | 8 ++ .../dmaapadapter/tasks/KafkaJobDataConsumer.java | 12 ++- .../dmaapadapter/tasks/KafkaTopicConsumers.java | 21 ++--- .../tasks/ProducerRegstrationTask.java | 17 ++-- .../org/oran/dmaapadapter/ApplicationTest.java | 95 ++++++++++++++-------- ...Controller.java => IcsSimulatorController.java} | 2 +- ...grationWithEcs.java => IntegrationWithIcs.java} | 8 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 34 ++++---- 10 files changed, 115 insertions(+), 88 deletions(-) rename dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/{EcsSimulatorController.java => IcsSimulatorController.java} (99%) rename dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/{IntegrationWithEcs.java => IntegrationWithIcs.java} (98%) diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml index bd260eab..c3476ac6 100644 --- a/dmaap-adaptor-java/config/application.yaml +++ b/dmaap-adaptor-java/config/application.yaml @@ -46,7 +46,7 @@ app: # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s http.proxy-host: http.proxy-port: 0 - ecs-base-url: https://localhost:8434 + ics-base-url: https://localhost:8434 # Location of the component configuration file. The file will only be used if the Consul database is not used; # configuration from the Consul will override the file. configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index b40c606a..3ea64e71 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -79,8 +79,8 @@ public class ApplicationConfig { private int localServerHttpPort; @Getter - @Value("${app.ecs-base-url}") - private String ecsBaseUrl; + @Value("${app.ics-base-url}") + private String icsBaseUrl; @Getter @Value("${app.dmaap-adapter-base-url}") diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java index e2538af1..f7cc14e9 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java @@ -51,6 +51,14 @@ public class MultiMap { return null; } + public T get(String key1, String key2) { + Map innerMap = this.map.get(key1); + if (innerMap == null) { + return null; + } + return innerMap.get(key2); + } + public Collection get(String key) { Map innerMap = this.map.get(key); if (innerMap == null) { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index f677502c..2a16f475 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks.Many; /** * The class streams data from a multi cast sink and sends the data to the Job @@ -75,7 +74,7 @@ public class KafkaJobDataConsumer { this.job = job; } - public synchronized void start(Many input) { + public synchronized void start(Flux input) { stop(); this.errorStats.resetKafkaErrors(); this.subscription = getMessagesFromKafka(input, job) // @@ -99,8 +98,8 @@ public class KafkaJobDataConsumer { public synchronized void stop() { if (this.subscription != null) { - subscription.dispose(); - subscription = null; + this.subscription.dispose(); + this.subscription = null; } } @@ -108,9 +107,8 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Many input, Job job) { - Flux result = input.asFlux() // - .filter(job::isFilterMatch); + private Flux getMessagesFromKafka(Flux input, Job job) { + Flux result = input.filter(job::isFilterMatch); if (job.isBuffered()) { result = result.map(this::quote) // diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 29ad8c75..48090170 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -71,7 +71,6 @@ public class KafkaTopicConsumers { public void onJobRemoved(Job job) { removeJob(job); } - }); } @@ -84,7 +83,7 @@ public class KafkaTopicConsumers { topicConsumer.start(); } KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); - subscription.start(topicConsumer.getOutput()); + subscription.start(topicConsumer.getOutput().asFlux()); consumers.put(job.getType().getId(), job.getId(), subscription); } } @@ -98,14 +97,12 @@ public class KafkaTopicConsumers { } @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) - public synchronized void restartNonRunningTasks() { - this.consumers.keySet().forEach(typeId -> { - this.consumers.get(typeId).forEach(consumer -> { - if (!consumer.isRunning()) { - restartTopic(consumer); - } - }); - }); + public synchronized void restartNonRunningTopics() { + for (String typeId : this.consumers.keySet()) { + for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) { + restartTopic(consumer); + } + } } private void restartTopic(KafkaJobDataConsumer consumer) { @@ -116,8 +113,6 @@ public class KafkaTopicConsumers { } private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.consumers.get(type.getId()).forEach((consumer) -> { - consumer.start(topic.getOutput()); - }); + this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 306cc6b4..c9284b5b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -50,7 +50,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Registers the types and this producer in ECS. This is done when needed. + * Registers the types and this producer in Innformation Coordinator Service. + * This is done when needed. */ @Component @EnableScheduling @@ -65,7 +66,7 @@ public class ProducerRegstrationTask { private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; @Getter - private boolean isRegisteredInEcs = false; + private boolean isRegisteredInIcs = false; private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { @@ -78,7 +79,7 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void supervisionTask() { checkRegistration() // - .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) // + .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) // .flatMap(isRegisterred -> registerTypesAndProducer()) // .subscribe( // null, // @@ -87,7 +88,7 @@ public class ProducerRegstrationTask { } private void handleRegistrationCompleted() { - isRegisteredInEcs = true; + isRegisteredInIcs = true; } private void handleRegistrationFailure(Throwable t) { @@ -96,7 +97,7 @@ public class ProducerRegstrationTask { // Returns TRUE if registration is correct private Mono checkRegistration() { - final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // .flatMap(this::isRegisterredInfoCorrect) // .onErrorResume(t -> Mono.just(Boolean.FALSE)); @@ -105,7 +106,7 @@ public class ProducerRegstrationTask { private Mono isRegisterredInfoCorrect(String registerredInfoStr) { ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered in ECS"); + logger.trace("Already registered in ICS"); return Mono.just(Boolean.TRUE); } else { return Mono.just(Boolean.FALSE); @@ -113,13 +114,13 @@ public class ProducerRegstrationTask { } private String registerTypeUrl(InfoType type) { - return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); + return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; final String producerUrl = - applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index e78313c0..0ea00564 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -50,6 +50,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -65,6 +67,7 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -90,7 +93,10 @@ class ApplicationTest { private ConsumerController consumerController; @Autowired - private EcsSimulatorController ecsSimulatorController; + private IcsSimulatorController icsSimulatorController; + + @Autowired + KafkaTopicConsumers kafkaTopicConsumers; private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); @@ -99,7 +105,7 @@ class ApplicationTest { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return thisProcessUrl(); } @@ -147,7 +153,7 @@ class ApplicationTest { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.ecsSimulatorController.testResults.reset(); + this.icsSimulatorController.testResults.reset(); this.jobs.clear(); } @@ -237,15 +243,54 @@ class ApplicationTest { } @Test - void testWholeChain() throws Exception { + void testReceiveAndPostDataFromKafka() { + final String JOB_ID = "ID"; + final String TYPE_ID = "KafkaInformationType"; + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + // Create a job + Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1); + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + ConsumerJobInfo kafkaJobInfo = + new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); + + this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID); + + // Handle received data from Kafka, check that it has been posted to the + // consumer + kafkaConsumer.start(Flux.just("data")); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]"); + + // Test send an exception + kafkaConsumer.start(Flux.error(new NullPointerException())); + + // Test regular restart of stopped + kafkaConsumer.stop(); + this.kafkaTopicConsumers.restartNonRunningTopics(); + await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue()); + + // Delete the job + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + + @Test + void testReceiveAndPostDataFromDmaap() throws Exception { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create a job - this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); // Return two messages from DMAAP and verify that these are sent to the owner of @@ -261,45 +306,25 @@ class ApplicationTest { assertThat(jobs).contains(JOB_ID); // Delete the job - this.ecsSimulatorController.deleteJob(JOB_ID, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } @Test void testReRegister() throws Exception { // Wait foir register types and producer - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Clear the registration, should trigger a re-register - ecsSimulatorController.testResults.reset(); - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + icsSimulatorController.testResults.reset(); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Just clear the registerred types, should trigger a re-register - ecsSimulatorController.testResults.types.clear(); + icsSimulatorController.testResults.types.clear(); await().untilAsserted( - () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); - } - - @Test - void testCreateKafkaJob() { - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - - final String TYPE_ID = "KafkaInformationType"; - - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); - String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); - - // Create a job - this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - // Delete the job - this.ecsSimulatorController.deleteJob("JOB_ID", restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); } private void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java similarity index 99% rename from dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java rename to dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java index 1cf8903a..bff2634f 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java @@ -47,7 +47,7 @@ import org.springframework.web.bind.annotation.RestController; @RestController("IcsSimulatorController") @Tag(name = "Information Coordinator Service Simulator (exists only in test)") -public class EcsSimulatorController { +public class IcsSimulatorController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final static Gson gson = new GsonBuilder().create(); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java similarity index 98% rename from dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java rename to dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index c8fcb832..12696edc 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -86,7 +86,7 @@ class IntegrationWithEcs { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return "https://localhost:8434"; } @@ -161,7 +161,7 @@ class IntegrationWithEcs { } private String ecsBaseUrl() { - return applicationConfig.getEcsBaseUrl(); + return applicationConfig.getIcsBaseUrl(); } private String jobUrl(String jobId) { @@ -214,7 +214,7 @@ class IntegrationWithEcs { @Test void testCreateKafkaJob() { - await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); @@ -233,7 +233,7 @@ class IntegrationWithEcs { @Test void testWholeChain() throws Exception { - await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*"); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 9cd4fdd1..c38af8a9 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -94,7 +94,7 @@ class IntegrationWithKafka { private ConsumerController consumerController; @Autowired - private EcsSimulatorController ecsSimulatorController; + private IcsSimulatorController icsSimulatorController; @Autowired private KafkaTopicConsumers kafkaTopicConsumers; @@ -108,7 +108,7 @@ class IntegrationWithKafka { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return thisProcessUrl(); } @@ -151,7 +151,7 @@ class IntegrationWithKafka { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.ecsSimulatorController.testResults.reset(); + this.icsSimulatorController.testResults.reset(); this.jobs.clear(); } @@ -252,13 +252,13 @@ class IntegrationWithKafka { final String JOB_ID2 = "ID2"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -268,8 +268,8 @@ class IntegrationWithKafka { verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); // Delete the jobs - this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); @@ -281,13 +281,13 @@ class IntegrationWithKafka { final String JOB_ID2 = "ID2"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -298,8 +298,8 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job - kafkaTopicConsumers.restartNonRunningTasks(); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job + kafkaTopicConsumers.restartNonRunningTopics(); Thread.sleep(1000); // Restarting the input seems to take some asynch time dataToSend = Flux.just(senderRecord("Howdy\"")); @@ -308,8 +308,8 @@ class IntegrationWithKafka { verifiedReceivedByConsumer("[\"Howdy\\\"\"]"); // Delete the jobs - this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); -- 2.16.6