X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Ftest%2Fjava%2Forg%2Foran%2Fdmaapadapter%2FApplicationTest.java;h=0ea005642b5bd6b4c092908b07cca9ed4cf42ab7;hb=7a52668a38ccf07bf0f900cc08063067531630c1;hp=e78313c04d34f25f218599623419da0193e94a06;hpb=c9c6839d27aaad72ff7b3140992e0ade4b0f07bb;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index e78313c0..0ea00564 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -50,6 +50,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -65,6 +67,7 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -90,7 +93,10 @@ class ApplicationTest { private ConsumerController consumerController; @Autowired - private EcsSimulatorController ecsSimulatorController; + private IcsSimulatorController icsSimulatorController; + + @Autowired + KafkaTopicConsumers kafkaTopicConsumers; private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); @@ -99,7 +105,7 @@ class ApplicationTest { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return thisProcessUrl(); } @@ -147,7 +153,7 @@ class ApplicationTest { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.ecsSimulatorController.testResults.reset(); + this.icsSimulatorController.testResults.reset(); this.jobs.clear(); } @@ -237,15 +243,54 @@ class ApplicationTest { } @Test - void testWholeChain() throws Exception { + void testReceiveAndPostDataFromKafka() { + final String JOB_ID = "ID"; + final String TYPE_ID = "KafkaInformationType"; + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + // Create a job + Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1); + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + ConsumerJobInfo kafkaJobInfo = + new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); + + this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID); + + // Handle received data from Kafka, check that it has been posted to the + // consumer + kafkaConsumer.start(Flux.just("data")); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]"); + + // Test send an exception + kafkaConsumer.start(Flux.error(new NullPointerException())); + + // Test regular restart of stopped + kafkaConsumer.stop(); + this.kafkaTopicConsumers.restartNonRunningTopics(); + await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue()); + + // Delete the job + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + + @Test + void testReceiveAndPostDataFromDmaap() throws Exception { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create a job - this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); // Return two messages from DMAAP and verify that these are sent to the owner of @@ -261,45 +306,25 @@ class ApplicationTest { assertThat(jobs).contains(JOB_ID); // Delete the job - this.ecsSimulatorController.deleteJob(JOB_ID, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } @Test void testReRegister() throws Exception { // Wait foir register types and producer - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Clear the registration, should trigger a re-register - ecsSimulatorController.testResults.reset(); - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + icsSimulatorController.testResults.reset(); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Just clear the registerred types, should trigger a re-register - ecsSimulatorController.testResults.types.clear(); + icsSimulatorController.testResults.types.clear(); await().untilAsserted( - () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); - } - - @Test - void testCreateKafkaJob() { - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - - final String TYPE_ID = "KafkaInformationType"; - - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); - String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); - - // Create a job - this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - // Delete the job - this.ecsSimulatorController.deleteJob("JOB_ID", restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); } private void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) {