X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Ftest%2Fjava%2Forg%2Foran%2Fdmaapadapter%2FIntegrationWithKafka.java;h=c38af8a9deb1496de94b2bddaa3cca9069d1d2c1;hb=5feecd881172a3b22041d35443c1f946e7d5f63e;hp=9cd4fdd111e4afc5896faa8b5ac35a194b625200;hpb=a28a4ad261601976c345425692116e5d7250b810;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 9cd4fdd1..c38af8a9 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -94,7 +94,7 @@ class IntegrationWithKafka { private ConsumerController consumerController; @Autowired - private EcsSimulatorController ecsSimulatorController; + private IcsSimulatorController icsSimulatorController; @Autowired private KafkaTopicConsumers kafkaTopicConsumers; @@ -108,7 +108,7 @@ class IntegrationWithKafka { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return thisProcessUrl(); } @@ -151,7 +151,7 @@ class IntegrationWithKafka { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.ecsSimulatorController.testResults.reset(); + this.icsSimulatorController.testResults.reset(); this.jobs.clear(); } @@ -252,13 +252,13 @@ class IntegrationWithKafka { final String JOB_ID2 = "ID2"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -268,8 +268,8 @@ class IntegrationWithKafka { verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); // Delete the jobs - this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); @@ -281,13 +281,13 @@ class IntegrationWithKafka { final String JOB_ID2 = "ID2"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -298,8 +298,8 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job - kafkaTopicConsumers.restartNonRunningTasks(); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job + kafkaTopicConsumers.restartNonRunningTopics(); Thread.sleep(1000); // Restarting the input seems to take some asynch time dataToSend = Flux.just(senderRecord("Howdy\"")); @@ -308,8 +308,8 @@ class IntegrationWithKafka { verifiedReceivedByConsumer("[\"Howdy\\\"\"]"); // Delete the jobs - this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());