Reverting to springboot version 2.5.8 65/7565/2
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 17 Jan 2022 13:58:06 +0000 (14:58 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 18 Jan 2022 10:56:45 +0000 (11:56 +0100)
Version 2.6.2 did not work well with Kafka

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-703
Change-Id: Ib3d16f484be77792c87013eee565b79bfd3219cb

dmaap-adaptor-java/pom.xml
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 71f2bb8..b555912 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.6.2</version>
+        <version>2.5.8</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric</groupId>
@@ -56,7 +56,6 @@
         <spotless-maven-plugin.version>1.24.3</spotless-maven-plugin.version>
         <swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version>
         <docker-maven-plugin>0.30.0</docker-maven-plugin>
-        <javax.ws.rs-api.version>2.1.1</javax.ws.rs-api.version>
         <sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
         <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
         <exec.skip>true</exec.skip>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
-            <version>1.3.7</version>
+            <version>1.3.9</version>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
\ No newline at end of file
+</project>
index c38af8a..5a48d61 100644 (file)
@@ -215,7 +215,7 @@ class IntegrationWithKafka {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -236,6 +236,8 @@ class IntegrationWithKafka {
                 .doOnError(e -> logger.error("Send failed", e)) //
                 .blockLast();
 
+        sender.close();
+
     }
 
     private void verifiedReceivedByConsumer(String... strings) {
@@ -246,6 +248,29 @@ class IntegrationWithKafka {
         }
     }
 
+    @Test
+    void simpleCase() throws InterruptedException {
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        Thread.sleep(4000);
+        var dataToSend = Flux.just(senderRecord("Message"));
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumer("Message");
+
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+    }
+
     @Test
     void kafkaIntegrationTest() throws Exception {
         final String JOB_ID1 = "ID1";
@@ -256,12 +281,13 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
                 restClient());
         this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
+        Thread.sleep(2000);
         var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);