NONRTRIC - Implement DMaaP mediator producer service in Java 18/7118/2
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 24 Nov 2021 13:22:54 +0000 (14:22 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 24 Nov 2021 14:10:17 +0000 (15:10 +0100)
Added unittest simultaing data and execptions from Kafka

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: I7d13a4aa22ad77d76aa451bca47ac493c42dca0e

dmaap-adaptor-java/config/application.yaml
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java [moved from dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java with 99% similarity]
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java [moved from dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java with 98% similarity]
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index bd260ea..c3476ac 100644 (file)
@@ -46,7 +46,7 @@ app:
     # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
     http.proxy-host:
     http.proxy-port: 0
-  ecs-base-url: https://localhost:8434
+  ics-base-url: https://localhost:8434
   # Location of the component configuration file. The file will only be used if the Consul database is not used;
   # configuration from the Consul will override the file.
   configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
index b40c606..3ea64e7 100644 (file)
@@ -79,8 +79,8 @@ public class ApplicationConfig {
     private int localServerHttpPort;
 
     @Getter
-    @Value("${app.ecs-base-url}")
-    private String ecsBaseUrl;
+    @Value("${app.ics-base-url}")
+    private String icsBaseUrl;
 
     @Getter
     @Value("${app.dmaap-adapter-base-url}")
index e2538af..f7cc14e 100644 (file)
@@ -51,6 +51,14 @@ public class MultiMap<T> {
         return null;
     }
 
+    public T get(String key1, String key2) {
+        Map<String, T> innerMap = this.map.get(key1);
+        if (innerMap == null) {
+            return null;
+        }
+        return innerMap.get(key2);
+    }
+
     public Collection<T> get(String key) {
         Map<String, T> innerMap = this.map.get(key);
         if (innerMap == null) {
index f677502..2a16f47 100644 (file)
@@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
 
 /**
  * The class streams data from a multi cast sink and sends the data to the Job
@@ -75,7 +74,7 @@ public class KafkaJobDataConsumer {
         this.job = job;
     }
 
-    public synchronized void start(Many<String> input) {
+    public synchronized void start(Flux<String> input) {
         stop();
         this.errorStats.resetKafkaErrors();
         this.subscription = getMessagesFromKafka(input, job) //
@@ -99,8 +98,8 @@ public class KafkaJobDataConsumer {
 
     public synchronized void stop() {
         if (this.subscription != null) {
-            subscription.dispose();
-            subscription = null;
+            this.subscription.dispose();
+            this.subscription = null;
         }
     }
 
@@ -108,9 +107,8 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
-        Flux<String> result = input.asFlux() //
-                .filter(job::isFilterMatch);
+    private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+        Flux<String> result = input.filter(job::isFilterMatch);
 
         if (job.isBuffered()) {
             result = result.map(this::quote) //
index 29ad8c7..4809017 100644 (file)
@@ -71,7 +71,6 @@ public class KafkaTopicConsumers {
             public void onJobRemoved(Job job) {
                 removeJob(job);
             }
-
         });
     }
 
@@ -84,7 +83,7 @@ public class KafkaTopicConsumers {
                 topicConsumer.start();
             }
             KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
-            subscription.start(topicConsumer.getOutput());
+            subscription.start(topicConsumer.getOutput().asFlux());
             consumers.put(job.getType().getId(), job.getId(), subscription);
         }
     }
@@ -98,14 +97,12 @@ public class KafkaTopicConsumers {
     }
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
-    public synchronized void restartNonRunningTasks() {
-        this.consumers.keySet().forEach(typeId -> {
-            this.consumers.get(typeId).forEach(consumer -> {
-                if (!consumer.isRunning()) {
-                    restartTopic(consumer);
-                }
-            });
-        });
+    public synchronized void restartNonRunningTopics() {
+        for (String typeId : this.consumers.keySet()) {
+            for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+                restartTopic(consumer);
+            }
+        }
     }
 
     private void restartTopic(KafkaJobDataConsumer consumer) {
@@ -116,8 +113,6 @@ public class KafkaTopicConsumers {
     }
 
     private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
-        this.consumers.get(type.getId()).forEach((consumer) -> {
-            consumer.start(topic.getOutput());
-        });
+        this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
     }
 }
index 306cc6b..c9284b5 100644 (file)
@@ -50,7 +50,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
  */
 @Component
 @EnableScheduling
@@ -65,7 +66,7 @@ public class ProducerRegstrationTask {
 
     private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
     @Getter
-    private boolean isRegisteredInEcs = false;
+    private boolean isRegisteredInIcs = false;
     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
 
     public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
@@ -78,7 +79,7 @@ public class ProducerRegstrationTask {
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
     public void supervisionTask() {
         checkRegistration() //
-                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
+                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
                 .flatMap(isRegisterred -> registerTypesAndProducer()) //
                 .subscribe( //
                         null, //
@@ -87,7 +88,7 @@ public class ProducerRegstrationTask {
     }
 
     private void handleRegistrationCompleted() {
-        isRegisteredInEcs = true;
+        isRegisteredInIcs = true;
     }
 
     private void handleRegistrationFailure(Throwable t) {
@@ -96,7 +97,7 @@ public class ProducerRegstrationTask {
 
     // Returns TRUE if registration is correct
     private Mono<Boolean> checkRegistration() {
-        final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+        final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
         return restClient.get(url) //
                 .flatMap(this::isRegisterredInfoCorrect) //
                 .onErrorResume(t -> Mono.just(Boolean.FALSE));
@@ -105,7 +106,7 @@ public class ProducerRegstrationTask {
     private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
         ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
         if (isEqual(producerRegistrationInfo(), registerredInfo)) {
-            logger.trace("Already registered in ECS");
+            logger.trace("Already registered in ICS");
             return Mono.just(Boolean.TRUE);
         } else {
             return Mono.just(Boolean.FALSE);
@@ -113,13 +114,13 @@ public class ProducerRegstrationTask {
     }
 
     private String registerTypeUrl(InfoType type) {
-        return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
+        return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
     }
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
         final String producerUrl =
-                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+                applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
index e78313c..0ea0056 100644 (file)
@@ -50,6 +50,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -65,6 +67,7 @@ import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -90,7 +93,10 @@ class ApplicationTest {
     private ConsumerController consumerController;
 
     @Autowired
-    private EcsSimulatorController ecsSimulatorController;
+    private IcsSimulatorController icsSimulatorController;
+
+    @Autowired
+    KafkaTopicConsumers kafkaTopicConsumers;
 
     private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
@@ -99,7 +105,7 @@ class ApplicationTest {
 
     static class TestApplicationConfig extends ApplicationConfig {
         @Override
-        public String getEcsBaseUrl() {
+        public String getIcsBaseUrl() {
             return thisProcessUrl();
         }
 
@@ -147,7 +153,7 @@ class ApplicationTest {
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();
-        this.ecsSimulatorController.testResults.reset();
+        this.icsSimulatorController.testResults.reset();
         this.jobs.clear();
     }
 
@@ -237,15 +243,54 @@ class ApplicationTest {
     }
 
     @Test
-    void testWholeChain() throws Exception {
+    void testReceiveAndPostDataFromKafka() {
+        final String JOB_ID = "ID";
+        final String TYPE_ID = "KafkaInformationType";
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        // Create a job
+        Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+        String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+        ConsumerJobInfo kafkaJobInfo =
+                new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+        this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+        // Handle received data from Kafka, check that it has been posted to the
+        // consumer
+        kafkaConsumer.start(Flux.just("data"));
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+        // Test send an exception
+        kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+        // Test regular restart of stopped
+        kafkaConsumer.stop();
+        this.kafkaTopicConsumers.restartNonRunningTopics();
+        await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+        // Delete the job
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
+    @Test
+    void testReceiveAndPostDataFromDmaap() throws Exception {
         final String JOB_ID = "ID";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create a job
-        this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
@@ -261,45 +306,25 @@ class ApplicationTest {
         assertThat(jobs).contains(JOB_ID);
 
         // Delete the job
-        this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     @Test
     void testReRegister() throws Exception {
         // Wait foir register types and producer
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Clear the registration, should trigger a re-register
-        ecsSimulatorController.testResults.reset();
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        icsSimulatorController.testResults.reset();
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Just clear the registerred types, should trigger a re-register
-        ecsSimulatorController.testResults.types.clear();
+        icsSimulatorController.testResults.types.clear();
         await().untilAsserted(
-                () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
-    }
-
-    @Test
-    void testCreateKafkaJob() {
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
-        final String TYPE_ID = "KafkaInformationType";
-
-        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
-        String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-        ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
-
-        // Create a job
-        this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
-        // Delete the job
-        this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+                () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
     }
 
     private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
@@ -47,7 +47,7 @@ import org.springframework.web.bind.annotation.RestController;
 
 @RestController("IcsSimulatorController")
 @Tag(name = "Information Coordinator Service Simulator (exists only in test)")
-public class EcsSimulatorController {
+public class IcsSimulatorController {
 
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final static Gson gson = new GsonBuilder().create();
@@ -86,7 +86,7 @@ class IntegrationWithEcs {
     static class TestApplicationConfig extends ApplicationConfig {
 
         @Override
-        public String getEcsBaseUrl() {
+        public String getIcsBaseUrl() {
             return "https://localhost:8434";
         }
 
@@ -161,7 +161,7 @@ class IntegrationWithEcs {
     }
 
     private String ecsBaseUrl() {
-        return applicationConfig.getEcsBaseUrl();
+        return applicationConfig.getIcsBaseUrl();
     }
 
     private String jobUrl(String jobId) {
@@ -214,7 +214,7 @@ class IntegrationWithEcs {
 
     @Test
     void testCreateKafkaJob() {
-        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
         Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
@@ -233,7 +233,7 @@ class IntegrationWithEcs {
 
     @Test
     void testWholeChain() throws Exception {
-        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
 
         createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
 
index 9cd4fdd..c38af8a 100644 (file)
@@ -94,7 +94,7 @@ class IntegrationWithKafka {
     private ConsumerController consumerController;
 
     @Autowired
-    private EcsSimulatorController ecsSimulatorController;
+    private IcsSimulatorController icsSimulatorController;
 
     @Autowired
     private KafkaTopicConsumers kafkaTopicConsumers;
@@ -108,7 +108,7 @@ class IntegrationWithKafka {
 
     static class TestApplicationConfig extends ApplicationConfig {
         @Override
-        public String getEcsBaseUrl() {
+        public String getIcsBaseUrl() {
             return thisProcessUrl();
         }
 
@@ -151,7 +151,7 @@ class IntegrationWithKafka {
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();
-        this.ecsSimulatorController.testResults.reset();
+        this.icsSimulatorController.testResults.reset();
         this.jobs.clear();
     }
 
@@ -252,13 +252,13 @@ class IntegrationWithKafka {
         final String JOB_ID2 = "ID2";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
                 restClient());
-        this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
@@ -268,8 +268,8 @@ class IntegrationWithKafka {
         verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
 
         // Delete the jobs
-        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
-        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
         await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
@@ -281,13 +281,13 @@ class IntegrationWithKafka {
         final String JOB_ID2 = "ID2";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs.
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
                 restClient());
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
@@ -298,8 +298,8 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
-        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
-        kafkaTopicConsumers.restartNonRunningTasks();
+        this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+        kafkaTopicConsumers.restartNonRunningTopics();
         Thread.sleep(1000); // Restarting the input seems to take some asynch time
 
         dataToSend = Flux.just(senderRecord("Howdy\""));
@@ -308,8 +308,8 @@ class IntegrationWithKafka {
         verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
 
         // Delete the jobs
-        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
-        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
         await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());