NONRTRIC - bugfix 17/8917/1
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 22 Aug 2022 08:40:40 +0000 (10:40 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 22 Aug 2022 13:23:53 +0000 (15:23 +0200)
Bugfix reverting vback tosptingboot 2.5.8 (2.6.6) works bad with kafka.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I20a78bdd0fa5368fa00933bea1293340bffcfbeb

config/application.yaml
config/application_configuration.json
pom.xml
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/filter/Filter.java
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/resources/test_application_configuration.json

index f6cd665..c96adc8 100644 (file)
@@ -16,11 +16,13 @@ springdoc:
 logging:
   # Configuration of logging
   level:
-    ROOT: ERROR
+    ROOT: WARN
+    org.apache.kafka: WARN
     org.springframework: ERROR
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.oran.dmaapadapter: INFO
+
   file:
     name: /var/log/dmaap-adapter-service/application.log
 server:
@@ -58,4 +60,5 @@ app:
   kafka:
     bootstrap-servers: localhost:9092
   # If the file name is empty, no authorization token is used
-  auth-token-file:
\ No newline at end of file
+  auth-token-file:
+  pm-files-path: /tmp
\ No newline at end of file
index 881da34..fbe6e6b 100644 (file)
          "kafkaInputTopic": "TutorialTopic",
          "useHttpProxy": false
       },
-       {
+      {
          "id": "PmData",
          "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
          "useHttpProxy": true,
-         "dataType" : "pmData"
-      },
+         "dataType": "pmData"
+      }
    ]
-}
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 91bbd41..5f2216f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.6.6</version>
+        <version>2.5.8</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric.plt</groupId>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
-            <version>1.3.9</version>
+            <version>1.3.12</version>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
+</project>
\ No newline at end of file
index 4e913b6..8c8e995 100644 (file)
@@ -96,6 +96,10 @@ public class ApplicationConfig {
     @Value("${app.kafka.bootstrap-servers:}")
     private String kafkaBootStrapServers;
 
+    @Getter
+    @Value("${app.pm-files-path:}")
+    private String pmFilesPath;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
index 9d29663..bfd358b 100644 (file)
@@ -34,6 +34,7 @@ public interface Filter {
     public static class FilteredData {
         public final String key;
         public final String value;
+        private static final FilteredData emptyData = new FilteredData("", "");
 
         public boolean isEmpty() {
             return value.isEmpty() && key.isEmpty();
@@ -45,10 +46,9 @@ public interface Filter {
         }
 
         public static FilteredData empty() {
-            return new FilteredData("", "");
+            return emptyData;
         }
     }
 
     public FilteredData filter(DataFromTopic data);
-
 }
index d7f89be..7a0b707 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
 
 import org.springframework.util.StringUtils;
 
 @ToString
+@Builder
 public class InfoType {
 
     @Getter
-    private final String id;
+    private String id;
 
     @Getter
-    private final String dmaapTopicUrl;
+    private String dmaapTopicUrl;
 
     @Getter
-    private final boolean useHttpProxy;
+    @Builder.Default
+    private boolean useHttpProxy = false;
 
     @Getter
-    private final String kafkaInputTopic;
+    private String kafkaInputTopic;
 
-    private final String dataType;
+    private String dataType;
 
     @Getter
+    @Builder.Default
     private boolean isJson = false;
 
-    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType,
-            boolean isJson) {
-        this.id = id;
-        this.dmaapTopicUrl = dmaapTopicUrl;
-        this.useHttpProxy = useHttpProxy;
-        this.kafkaInputTopic = kafkaInputTopic;
-        this.dataType = dataType;
-        this.isJson = isJson;
-    }
-
     public boolean isKafkaTopicDefined() {
         return StringUtils.hasLength(kafkaInputTopic);
     }
@@ -76,6 +70,5 @@ public class InfoType {
             return DataType.PM_DATA;
         }
         return DataType.OTHER;
-
     }
 }
index 62bf942..7514d37 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oran.dmaapadapter.tasks;
 
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,10 +32,19 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
 import reactor.core.publisher.Flux;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
 
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+
 /**
  * The class streams incoming requests from a Kafka topic and sends them further
  * to a multi cast sink, which several other streams can connect to.
@@ -45,23 +55,36 @@ public class KafkaTopicListener implements TopicListener {
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private Flux<DataFromTopic> dataFromTopic;
+    private final String kafkaClientId;
+
+    private static Gson gson = new GsonBuilder() //
+            .disableHtmlEscaping() //
+            .create(); //
+
+    @ToString
+    @Builder
+    public static class NewFileEvent {
+        @Getter
+        private String filename;
+    }
 
-    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
+    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type, String kafkaClientId) {
         this.applicationConfig = applicationConfig;
         this.type = type;
+        this.kafkaClientId = kafkaClientId;
     }
 
     @Override
     public Flux<DataFromTopic> getFlux() {
         if (this.dataFromTopic == null) {
-            this.dataFromTopic = startReceiveFromTopic();
+            this.dataFromTopic = startReceiveFromTopic(this.kafkaClientId);
         }
         return this.dataFromTopic;
     }
 
-    private Flux<DataFromTopic> startReceiveFromTopic() {
+    private Flux<DataFromTopic> startReceiveFromTopic(String clientId) {
         logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
-        return KafkaReceiver.create(kafkaInputProperties()) //
+        return KafkaReceiver.create(kafkaInputProperties(clientId)) //
                 .receive() //
                 .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
                         input.value())) //
@@ -69,11 +92,30 @@ public class KafkaTopicListener implements TopicListener {
                 .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
                 .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
                 .map(input -> new DataFromTopic(input.key(), input.value())) //
+                .map(this::getDataFromFileIfNewPmFileEvent) //
                 .publish() //
-                .autoConnect();
+                .autoConnect(1);
+    }
+
+    private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
+
+        if (!applicationConfig.getPmFilesPath().isEmpty()
+                && this.type.getDataType() == InfoType.DataType.PM_DATA
+                && data.value.length() < 1000) {
+            try {
+                NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+                Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename());
+                String pmReportJson = Files.readString(path, Charset.defaultCharset());
+                return new DataFromTopic(data.key, pmReportJson);
+            } catch (Exception e) {
+                return data;
+            }
+        } else {
+            return data;
+        }
     }
 
-    private ReceiverOptions<String, String> kafkaInputProperties() {
+    private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
         Map<String, Object> consumerProps = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
@@ -83,7 +125,7 @@ public class KafkaTopicListener implements TopicListener {
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.applicationConfig.getSelfUrl());
+        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 
         return ReceiverOptions.<String, String>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
index fcc94ee..ea7ab81 100644 (file)
@@ -59,7 +59,8 @@ public class TopicListeners {
 
         for (InfoType type : types.getAll()) {
             if (type.isKafkaTopicDefined()) {
-                KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
+                KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type,
+                        type.getId() + "_" + appConfig.getSelfUrl());
                 kafkaTopicListeners.put(type.getId(), topicConsumer);
             }
             if (type.isDmaapTopicDefined()) {
index 65a1366..488fc89 100644 (file)
@@ -79,8 +79,8 @@ import reactor.kafka.sender.SenderRecord;
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
-        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
-})
+        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
+        "app.pm-files-path=./src/test/resources/" }) //
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
@@ -168,8 +168,10 @@ class IntegrationWithKafka {
 
             // Create a listener to the output topic. The KafkaTopicListener happens to be
             // suitable for that,
-            InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
-            KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+            InfoType type = InfoType.builder().id("id").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
+
+            KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type,
+                    "TestClientId" + "_" + outputTopic);
 
             topicListener.getFlux() //
                     .doOnNext(this::set) //
@@ -306,7 +308,7 @@ class IntegrationWithKafka {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
+        // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -473,10 +475,14 @@ class IntegrationWithKafka {
 
         Instant startTime = Instant.now();
 
+        KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json")
+                .build();
+        String eventAsString = gson.toJson(event);
+
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
 
-        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(pmReportJson, "", PM_TYPE_ID));
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(eventAsString, "key", PM_TYPE_ID));
         sendDataToStream(dataToSend);
 
         while (kafkaReceiver.count != NO_OF_OBJECTS) {
index f64ab12..2863590 100644 (file)
       },
       {
          "id": "PmInformationTypeKafka",
-         "kafkaInputTopic": "KafkaPmInput",
+         "kafkaInputTopic": "PmFileData",
          "useHttpProxy": false,
          "dataType": "PmData",
          "isJson": true
       }
    ]
-}
+}
\ No newline at end of file