From f7d034ec7e77608b74ecf84d84818f0fea63e7ff Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 22 Aug 2022 10:40:40 +0200 Subject: [PATCH] NONRTRIC - bugfix Bugfix reverting vback tosptingboot 2.5.8 (2.6.6) works bad with kafka. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I20a78bdd0fa5368fa00933bea1293340bffcfbeb --- config/application.yaml | 7 ++- config/application_configuration.json | 8 ++-- pom.xml | 6 +-- .../configuration/ApplicationConfig.java | 4 ++ .../java/org/oran/dmaapadapter/filter/Filter.java | 4 +- .../org/oran/dmaapadapter/repository/InfoType.java | 25 ++++------ .../dmaapadapter/tasks/KafkaTopicListener.java | 56 +++++++++++++++++++--- .../oran/dmaapadapter/tasks/TopicListeners.java | 3 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 18 ++++--- .../resources/test_application_configuration.json | 4 +- 10 files changed, 92 insertions(+), 43 deletions(-) diff --git a/config/application.yaml b/config/application.yaml index f6cd665..c96adc8 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -16,11 +16,13 @@ springdoc: logging: # Configuration of logging level: - ROOT: ERROR + ROOT: WARN + org.apache.kafka: WARN org.springframework: ERROR org.springframework.data: ERROR org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR org.oran.dmaapadapter: INFO + file: name: /var/log/dmaap-adapter-service/application.log server: @@ -58,4 +60,5 @@ app: kafka: bootstrap-servers: localhost:9092 # If the file name is empty, no authorization token is used - auth-token-file: \ No newline at end of file + auth-token-file: + pm-files-path: /tmp \ No newline at end of file diff --git a/config/application_configuration.json b/config/application_configuration.json index 881da34..fbe6e6b 100644 --- a/config/application_configuration.json +++ b/config/application_configuration.json @@ -10,11 +10,11 @@ "kafkaInputTopic": "TutorialTopic", "useHttpProxy": false }, - { + { "id": "PmData", "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12", "useHttpProxy": true, - "dataType" : "pmData" - }, + "dataType": "pmData" + } ] -} +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 91bbd41..5f2216f 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.springframework.boot spring-boot-starter-parent - 2.6.6 + 2.5.8 org.o-ran-sc.nonrtric.plt @@ -191,7 +191,7 @@ io.projectreactor.kafka reactor-kafka - 1.3.9 + 1.3.12 com.google.guava @@ -373,4 +373,4 @@ JIRA https://jira.o-ran-sc.org/ - + \ No newline at end of file diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index 4e913b6..8c8e995 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -96,6 +96,10 @@ public class ApplicationConfig { @Value("${app.kafka.bootstrap-servers:}") private String kafkaBootStrapServers; + @Getter + @Value("${app.pm-files-path:}") + private String pmFilesPath; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index 9d29663..bfd358b 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -34,6 +34,7 @@ public interface Filter { public static class FilteredData { public final String key; public final String value; + private static final FilteredData emptyData = new FilteredData("", ""); public boolean isEmpty() { return value.isEmpty() && key.isEmpty(); @@ -45,10 +46,9 @@ public interface Filter { } public static FilteredData empty() { - return new FilteredData("", ""); + return emptyData; } } public FilteredData filter(DataFromTopic data); - } diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index d7f89be..7a0b707 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -20,41 +20,35 @@ package org.oran.dmaapadapter.repository; +import lombok.Builder; import lombok.Getter; import lombok.ToString; import org.springframework.util.StringUtils; @ToString +@Builder public class InfoType { @Getter - private final String id; + private String id; @Getter - private final String dmaapTopicUrl; + private String dmaapTopicUrl; @Getter - private final boolean useHttpProxy; + @Builder.Default + private boolean useHttpProxy = false; @Getter - private final String kafkaInputTopic; + private String kafkaInputTopic; - private final String dataType; + private String dataType; @Getter + @Builder.Default private boolean isJson = false; - public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType, - boolean isJson) { - this.id = id; - this.dmaapTopicUrl = dmaapTopicUrl; - this.useHttpProxy = useHttpProxy; - this.kafkaInputTopic = kafkaInputTopic; - this.dataType = dataType; - this.isJson = isJson; - } - public boolean isKafkaTopicDefined() { return StringUtils.hasLength(kafkaInputTopic); } @@ -76,6 +70,5 @@ public class InfoType { return DataType.PM_DATA; } return DataType.OTHER; - } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 62bf942..7514d37 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -20,6 +20,7 @@ package org.oran.dmaapadapter.tasks; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,10 +32,19 @@ import org.oran.dmaapadapter.repository.InfoType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import lombok.Builder; +import lombok.Getter; +import lombok.ToString; import reactor.core.publisher.Flux; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; +import java.nio.charset.Charset; +import java.nio.file.Files; + /** * The class streams incoming requests from a Kafka topic and sends them further * to a multi cast sink, which several other streams can connect to. @@ -45,23 +55,36 @@ public class KafkaTopicListener implements TopicListener { private final ApplicationConfig applicationConfig; private final InfoType type; private Flux dataFromTopic; + private final String kafkaClientId; + + private static Gson gson = new GsonBuilder() // + .disableHtmlEscaping() // + .create(); // + + @ToString + @Builder + public static class NewFileEvent { + @Getter + private String filename; + } - public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { + public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type, String kafkaClientId) { this.applicationConfig = applicationConfig; this.type = type; + this.kafkaClientId = kafkaClientId; } @Override public Flux getFlux() { if (this.dataFromTopic == null) { - this.dataFromTopic = startReceiveFromTopic(); + this.dataFromTopic = startReceiveFromTopic(this.kafkaClientId); } return this.dataFromTopic; } - private Flux startReceiveFromTopic() { + private Flux startReceiveFromTopic(String clientId) { logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId()); - return KafkaReceiver.create(kafkaInputProperties()) // + return KafkaReceiver.create(kafkaInputProperties(clientId)) // .receive() // .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value())) // @@ -69,11 +92,30 @@ public class KafkaTopicListener implements TopicListener { .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) // .map(input -> new DataFromTopic(input.key(), input.value())) // + .map(this::getDataFromFileIfNewPmFileEvent) // .publish() // - .autoConnect(); + .autoConnect(1); + } + + private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) { + + if (!applicationConfig.getPmFilesPath().isEmpty() + && this.type.getDataType() == InfoType.DataType.PM_DATA + && data.value.length() < 1000) { + try { + NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); + Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename()); + String pmReportJson = Files.readString(path, Charset.defaultCharset()); + return new DataFromTopic(data.key, pmReportJson); + } catch (Exception e) { + return data; + } + } else { + return data; + } } - private ReceiverOptions kafkaInputProperties() { + private ReceiverOptions kafkaInputProperties(String clientId) { Map consumerProps = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); @@ -83,7 +125,7 @@ public class KafkaTopicListener implements TopicListener { consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.applicationConfig.getSelfUrl()); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index fcc94ee..ea7ab81 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -59,7 +59,8 @@ public class TopicListeners { for (InfoType type : types.getAll()) { if (type.isKafkaTopicDefined()) { - KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type); + KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type, + type.getId() + "_" + appConfig.getSelfUrl()); kafkaTopicListeners.put(type.getId(), topicConsumer); } if (type.isDmaapTopicDefined()) { diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 65a1366..488fc89 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -79,8 +79,8 @@ import reactor.kafka.sender.SenderRecord; @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // - "app.configuration-filepath=./src/test/resources/test_application_configuration.json"// -}) + "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // + "app.pm-files-path=./src/test/resources/" }) // class IntegrationWithKafka { final String TYPE_ID = "KafkaInformationType"; @@ -168,8 +168,10 @@ class IntegrationWithKafka { // Create a listener to the output topic. The KafkaTopicListener happens to be // suitable for that, - InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false); - KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type); + InfoType type = InfoType.builder().id("id").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build(); + + KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type, + "TestClientId" + "_" + outputTopic); topicListener.getFlux() // .doOnNext(this::set) // @@ -306,7 +308,7 @@ class IntegrationWithKafka { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); + // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -473,10 +475,14 @@ class IntegrationWithKafka { Instant startTime = Instant.now(); + KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json") + .build(); + String eventAsString = gson.toJson(event); + String path = "./src/test/resources/pm_report.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(pmReportJson, "", PM_TYPE_ID)); + var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(eventAsString, "key", PM_TYPE_ID)); sendDataToStream(dataToSend); while (kafkaReceiver.count != NO_OF_OBJECTS) { diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index f64ab12..2863590 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -20,10 +20,10 @@ }, { "id": "PmInformationTypeKafka", - "kafkaInputTopic": "KafkaPmInput", + "kafkaInputTopic": "PmFileData", "useHttpProxy": false, "dataType": "PmData", "isJson": true } ] -} +} \ No newline at end of file -- 2.16.6