Bugfix reverting vback tosptingboot 2.5.8 (2.6.6) works bad with kafka.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I20a78bdd0fa5368fa00933bea1293340bffcfbeb
logging:
# Configuration of logging
level:
- ROOT: ERROR
+ ROOT: WARN
+ org.apache.kafka: WARN
org.springframework: ERROR
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.oran.dmaapadapter: INFO
+
file:
name: /var/log/dmaap-adapter-service/application.log
server:
kafka:
bootstrap-servers: localhost:9092
# If the file name is empty, no authorization token is used
- auth-token-file:
\ No newline at end of file
+ auth-token-file:
+ pm-files-path: /tmp
\ No newline at end of file
"kafkaInputTopic": "TutorialTopic",
"useHttpProxy": false
},
- {
+ {
"id": "PmData",
"dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
"useHttpProxy": true,
- "dataType" : "pmData"
- },
+ "dataType": "pmData"
+ }
]
-}
+}
\ No newline at end of file
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.6</version>
+ <version>2.5.8</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric.plt</groupId>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
- <version>1.3.9</version>
+ <version>1.3.12</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
+</project>
\ No newline at end of file
@Value("${app.kafka.bootstrap-servers:}")
private String kafkaBootStrapServers;
+ @Getter
+ @Value("${app.pm-files-path:}")
+ private String pmFilesPath;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
public static class FilteredData {
public final String key;
public final String value;
+ private static final FilteredData emptyData = new FilteredData("", "");
public boolean isEmpty() {
return value.isEmpty() && key.isEmpty();
}
public static FilteredData empty() {
- return new FilteredData("", "");
+ return emptyData;
}
}
public FilteredData filter(DataFromTopic data);
-
}
package org.oran.dmaapadapter.repository;
+import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import org.springframework.util.StringUtils;
@ToString
+@Builder
public class InfoType {
@Getter
- private final String id;
+ private String id;
@Getter
- private final String dmaapTopicUrl;
+ private String dmaapTopicUrl;
@Getter
- private final boolean useHttpProxy;
+ @Builder.Default
+ private boolean useHttpProxy = false;
@Getter
- private final String kafkaInputTopic;
+ private String kafkaInputTopic;
- private final String dataType;
+ private String dataType;
@Getter
+ @Builder.Default
private boolean isJson = false;
- public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType,
- boolean isJson) {
- this.id = id;
- this.dmaapTopicUrl = dmaapTopicUrl;
- this.useHttpProxy = useHttpProxy;
- this.kafkaInputTopic = kafkaInputTopic;
- this.dataType = dataType;
- this.isJson = isJson;
- }
-
public boolean isKafkaTopicDefined() {
return StringUtils.hasLength(kafkaInputTopic);
}
return DataType.PM_DATA;
}
return DataType.OTHER;
-
}
}
package org.oran.dmaapadapter.tasks;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+
/**
* The class streams incoming requests from a Kafka topic and sends them further
* to a multi cast sink, which several other streams can connect to.
private final ApplicationConfig applicationConfig;
private final InfoType type;
private Flux<DataFromTopic> dataFromTopic;
+ private final String kafkaClientId;
+
+ private static Gson gson = new GsonBuilder() //
+ .disableHtmlEscaping() //
+ .create(); //
+
+ @ToString
+ @Builder
+ public static class NewFileEvent {
+ @Getter
+ private String filename;
+ }
- public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
+ public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type, String kafkaClientId) {
this.applicationConfig = applicationConfig;
this.type = type;
+ this.kafkaClientId = kafkaClientId;
}
@Override
public Flux<DataFromTopic> getFlux() {
if (this.dataFromTopic == null) {
- this.dataFromTopic = startReceiveFromTopic();
+ this.dataFromTopic = startReceiveFromTopic(this.kafkaClientId);
}
return this.dataFromTopic;
}
- private Flux<DataFromTopic> startReceiveFromTopic() {
+ private Flux<DataFromTopic> startReceiveFromTopic(String clientId) {
logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
- return KafkaReceiver.create(kafkaInputProperties()) //
+ return KafkaReceiver.create(kafkaInputProperties(clientId)) //
.receive() //
.doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
input.value())) //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
.filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
.map(input -> new DataFromTopic(input.key(), input.value())) //
+ .map(this::getDataFromFileIfNewPmFileEvent) //
.publish() //
- .autoConnect();
+ .autoConnect(1);
+ }
+
+ private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
+
+ if (!applicationConfig.getPmFilesPath().isEmpty()
+ && this.type.getDataType() == InfoType.DataType.PM_DATA
+ && data.value.length() < 1000) {
+ try {
+ NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+ Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename());
+ String pmReportJson = Files.readString(path, Charset.defaultCharset());
+ return new DataFromTopic(data.key, pmReportJson);
+ } catch (Exception e) {
+ return data;
+ }
+ } else {
+ return data;
+ }
}
- private ReceiverOptions<String, String> kafkaInputProperties() {
+ private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.applicationConfig.getSelfUrl());
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
return ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
for (InfoType type : types.getAll()) {
if (type.isKafkaTopicDefined()) {
- KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
+ KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type,
+ type.getId() + "_" + appConfig.getSelfUrl());
kafkaTopicListeners.put(type.getId(), topicConsumer);
}
if (type.isDmaapTopicDefined()) {
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
- "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
-})
+ "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
+ "app.pm-files-path=./src/test/resources/" }) //
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
// Create a listener to the output topic. The KafkaTopicListener happens to be
// suitable for that,
- InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
- KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+ InfoType type = InfoType.builder().id("id").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
+
+ KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type,
+ "TestClientId" + "_" + outputTopic);
topicListener.getFlux() //
.doOnNext(this::set) //
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
+ // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Instant startTime = Instant.now();
+ KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json")
+ .build();
+ String eventAsString = gson.toJson(event);
+
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(pmReportJson, "", PM_TYPE_ID));
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(eventAsString, "key", PM_TYPE_ID));
sendDataToStream(dataToSend);
while (kafkaReceiver.count != NO_OF_OBJECTS) {
},
{
"id": "PmInformationTypeKafka",
- "kafkaInputTopic": "KafkaPmInput",
+ "kafkaInputTopic": "PmFileData",
"useHttpProxy": false,
"dataType": "PmData",
"isJson": true
}
]
-}
+}
\ No newline at end of file