NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / test / java / org / oran / dmaapadapter / ApplicationTest.java
index e78313c..0ea0056 100644 (file)
@@ -50,6 +50,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -65,6 +67,7 @@ import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -90,7 +93,10 @@ class ApplicationTest {
     private ConsumerController consumerController;
 
     @Autowired
-    private EcsSimulatorController ecsSimulatorController;
+    private IcsSimulatorController icsSimulatorController;
+
+    @Autowired
+    KafkaTopicConsumers kafkaTopicConsumers;
 
     private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
@@ -99,7 +105,7 @@ class ApplicationTest {
 
     static class TestApplicationConfig extends ApplicationConfig {
         @Override
-        public String getEcsBaseUrl() {
+        public String getIcsBaseUrl() {
             return thisProcessUrl();
         }
 
@@ -147,7 +153,7 @@ class ApplicationTest {
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();
-        this.ecsSimulatorController.testResults.reset();
+        this.icsSimulatorController.testResults.reset();
         this.jobs.clear();
     }
 
@@ -237,15 +243,54 @@ class ApplicationTest {
     }
 
     @Test
-    void testWholeChain() throws Exception {
+    void testReceiveAndPostDataFromKafka() {
+        final String JOB_ID = "ID";
+        final String TYPE_ID = "KafkaInformationType";
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        // Create a job
+        Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+        String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+        ConsumerJobInfo kafkaJobInfo =
+                new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+        this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+        // Handle received data from Kafka, check that it has been posted to the
+        // consumer
+        kafkaConsumer.start(Flux.just("data"));
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+        // Test send an exception
+        kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+        // Test regular restart of stopped
+        kafkaConsumer.stop();
+        this.kafkaTopicConsumers.restartNonRunningTopics();
+        await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+        // Delete the job
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
+    @Test
+    void testReceiveAndPostDataFromDmaap() throws Exception {
         final String JOB_ID = "ID";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create a job
-        this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
@@ -261,45 +306,25 @@ class ApplicationTest {
         assertThat(jobs).contains(JOB_ID);
 
         // Delete the job
-        this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     @Test
     void testReRegister() throws Exception {
         // Wait foir register types and producer
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Clear the registration, should trigger a re-register
-        ecsSimulatorController.testResults.reset();
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        icsSimulatorController.testResults.reset();
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Just clear the registerred types, should trigger a re-register
-        ecsSimulatorController.testResults.types.clear();
+        icsSimulatorController.testResults.types.clear();
         await().untilAsserted(
-                () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
-    }
-
-    @Test
-    void testCreateKafkaJob() {
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
-        final String TYPE_ID = "KafkaInformationType";
-
-        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
-        String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-        ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
-
-        // Create a job
-        this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
-        // Delete the job
-        this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+                () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
     }
 
     private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {