Reverting to springboot version 2.5.8
[nonrtric.git] / dmaap-adaptor-java / src / test / java / org / oran / dmaapadapter / IntegrationWithKafka.java
index 9cd4fdd..5a48d61 100644 (file)
@@ -94,7 +94,7 @@ class IntegrationWithKafka {
     private ConsumerController consumerController;
 
     @Autowired
-    private EcsSimulatorController ecsSimulatorController;
+    private IcsSimulatorController icsSimulatorController;
 
     @Autowired
     private KafkaTopicConsumers kafkaTopicConsumers;
@@ -108,7 +108,7 @@ class IntegrationWithKafka {
 
     static class TestApplicationConfig extends ApplicationConfig {
         @Override
-        public String getEcsBaseUrl() {
+        public String getIcsBaseUrl() {
             return thisProcessUrl();
         }
 
@@ -151,7 +151,7 @@ class IntegrationWithKafka {
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();
-        this.ecsSimulatorController.testResults.reset();
+        this.icsSimulatorController.testResults.reset();
         this.jobs.clear();
     }
 
@@ -215,7 +215,7 @@ class IntegrationWithKafka {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -236,6 +236,8 @@ class IntegrationWithKafka {
                 .doOnError(e -> logger.error("Send failed", e)) //
                 .blockLast();
 
+        sender.close();
+
     }
 
     private void verifiedReceivedByConsumer(String... strings) {
@@ -246,30 +248,54 @@ class IntegrationWithKafka {
         }
     }
 
+    @Test
+    void simpleCase() throws InterruptedException {
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        Thread.sleep(4000);
+        var dataToSend = Flux.just(senderRecord("Message"));
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumer("Message");
+
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+    }
+
     @Test
     void kafkaIntegrationTest() throws Exception {
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
                 restClient());
-        this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
+        Thread.sleep(2000);
         var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
 
         // Delete the jobs
-        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
-        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
         await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
@@ -281,13 +307,13 @@ class IntegrationWithKafka {
         final String JOB_ID2 = "ID2";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs.
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
                 restClient());
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
@@ -298,8 +324,8 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
-        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
-        kafkaTopicConsumers.restartNonRunningTasks();
+        this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+        kafkaTopicConsumers.restartNonRunningTopics();
         Thread.sleep(1000); // Restarting the input seems to take some asynch time
 
         dataToSend = Flux.just(senderRecord("Howdy\""));
@@ -308,8 +334,8 @@ class IntegrationWithKafka {
         verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
 
         // Delete the jobs
-        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
-        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
         await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());