From 0553c9422aa7b69145b7e6d23f39a9052f2b1da7 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 23 Sep 2022 15:06:40 +0200 Subject: [PATCH] Added a feature to query stored PM data Change-Id: I6ac0f2e6a4a6db0cea8ffd36583c330a1dc0babb Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 --- config/application.yaml | 2 + .../configuration/ApplicationConfig.java | 8 + .../oran/dmaapadapter/filter/PmReportFilter.java | 5 + .../java/org/oran/dmaapadapter/repository/Job.java | 1 + .../org/oran/dmaapadapter/tasks/DataStore.java | 43 ++++ .../org/oran/dmaapadapter/tasks/FileStore.java | 137 +++++++++++ .../dmaapadapter/tasks/HttpJobDataDistributor.java | 5 +- .../dmaapadapter/tasks/JobDataDistributor.java | 123 +++++++++- .../tasks/KafkaJobDataDistributor.java | 4 +- .../dmaapadapter/tasks/KafkaTopicListener.java | 100 +------- .../org/oran/dmaapadapter/tasks/S3ObjectStore.java | 262 +++++++++++++++++++++ .../oran/dmaapadapter/tasks/TopicListeners.java | 2 +- src/main/resources/typeSchemaPmData.json | 7 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 114 +++++---- 14 files changed, 661 insertions(+), 152 deletions(-) create mode 100644 src/main/java/org/oran/dmaapadapter/tasks/DataStore.java create mode 100644 src/main/java/org/oran/dmaapadapter/tasks/FileStore.java create mode 100644 src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java diff --git a/config/application.yaml b/config/application.yaml index 86e5987..74dc2be 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -85,3 +85,5 @@ app: endpointOverride: http://localhost:9000 accessKeyId: minio secretAccessKey: miniostorage + locksBucket: ropfilelocks + bucket: ropfiles diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index 6beee21..097fd32 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -116,6 +116,14 @@ public class ApplicationConfig { @Value("${app.s3.secretAccessKey:}") private String s3SecretAccessKey; + @Getter + @Value("${app.s3.locksBucket:}") + private String s3LocksBucket; + + @Getter + @Value("${app.s3.bucket:}") + private String s3Bucket; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index 3740eef..e33ae68 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.Map; import lombok.Getter; +import lombok.Setter; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; @@ -48,6 +49,7 @@ public class PmReportFilter implements Filter { .disableHtmlEscaping() // .create(); + @Getter private final FilterData filterData; @Getter @@ -57,6 +59,9 @@ public class PmReportFilter implements Filter { final Collection measTypes = new HashSet<>(); final Collection measuredEntityDns = new ArrayList<>(); final Collection measObjClass = new HashSet<>(); + + @Setter + String pmRopStartTime; } private static class MeasTypesIndexed extends PmReport.MeasTypes { diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index acb9136..9fd8e57 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -184,6 +184,7 @@ public class Job { @Getter private final String lastUpdated; + @Getter private final Filter filter; @Getter diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java new file mode 100644 index 0000000..75291f0 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java @@ -0,0 +1,43 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DataStore { + public enum Bucket { + FILES, LOCKS + } + + public Flux listFiles(Bucket bucket, String prefix); + + public Mono readFile(Bucket bucket, String fileName); + + public Mono readFile(String bucket, String fileName); + + public Mono createLock(String name); + + public Mono deleteLock(String name); + + public Mono deleteObject(Bucket bucket, String name); + +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java new file mode 100644 index 0000000..41bacd9 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java @@ -0,0 +1,137 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.springframework.http.HttpStatus; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class FileStore implements DataStore { + + ApplicationConfig applicationConfig; + + public FileStore(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + } + + @Override + public Flux listFiles(Bucket bucket, String prefix) { + Path root = Path.of(applicationConfig.getPmFilesPath(), prefix); + if (!root.toFile().exists()) { + root = root.getParent(); + } + + List result = new ArrayList<>(); + try (Stream stream = Files.walk(root, Integer.MAX_VALUE)) { + + stream.forEach(path -> filterListFiles(path, prefix, result)); + + return Flux.fromIterable(result); + } catch (Exception e) { + return Flux.error(e); + } + } + + private void filterListFiles(Path path, String prefix, List result) { + if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { + result.add(externalName(path)); + } + } + + private String externalName(Path f) { + String fullName = f.toString(); + return fullName.substring(applicationConfig.getPmFilesPath().length()); + } + + public Mono readFile(String bucket, String fileName) { + return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT)); + } + + @Override + public Mono readFile(Bucket bucket, String fileName) { + try { + String contents = Files.readString(path(fileName)); + return Mono.just(contents); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono createLock(String name) { + File file = path(name).toFile(); + try { + boolean res = file.createNewFile(); + return Mono.just(res || isAged(file)); + } catch (Exception e) { + return Mono.just(file.exists() && isAged(file)); + } + } + + public Mono copyFileTo(Path from, String to) { + try { + Path toPath = path(to); + Files.createDirectories(toPath); + Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING); + return Mono.just(to); + } catch (Exception e) { + return Mono.error(e); + } + } + + private boolean isAged(File file) { + final long MAX_AGE_SECONDS = (long) 60 * 5; + return Instant.now().getEpochSecond() - file.lastModified() > MAX_AGE_SECONDS; + + } + + @Override + public Mono deleteLock(String name) { + return deleteObject(Bucket.LOCKS, name); + } + + @Override + public Mono deleteObject(Bucket bucket, String name) { + try { + Files.delete(path(name)); + return Mono.just(true); + } catch (Exception e) { + return Mono.just(false); + } + } + + private Path path(String name) { + return Path.of(applicationConfig.getPmFilesPath(), name); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index 9ba5131..f57c12d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -20,6 +20,7 @@ package org.oran.dmaapadapter.tasks; +import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; @@ -35,8 +36,8 @@ import reactor.core.publisher.Mono; public class HttpJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpJobDataDistributor(Job job) { - super(job); + public HttpJobDataDistributor(Job job, ApplicationConfig config) { + super(job, config); } @Override diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index bda54a4..2e98d81 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -20,12 +20,26 @@ package org.oran.dmaapadapter.tasks; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; + import lombok.Getter; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.Filter; +import org.oran.dmaapadapter.filter.PmReportFilter; +import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.tasks.KafkaTopicListener.NewFileEvent; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; @@ -43,6 +57,10 @@ public abstract class JobDataDistributor { private final Job job; private Disposable subscription; private final ErrorStats errorStats = new ErrorStats(); + private final ApplicationConfig applConfig; + + private final DataStore fileStore; + private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); private class ErrorStats { private int consumerFaultCounter = 0; @@ -70,12 +88,17 @@ public abstract class JobDataDistributor { } } - protected JobDataDistributor(Job job) { + protected JobDataDistributor(Job job, ApplicationConfig applConfig) { this.job = job; + this.applConfig = applConfig; + this.fileStore = applConfig.isS3Enabled() ? new S3ObjectStore(applConfig) : new FileStore(applConfig); } public synchronized void start(Flux input) { stop(); + + collectHistoricalData(); + this.errorStats.resetIrrecoverableErrors(); this.subscription = filterAndBuffer(input, this.job) // .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // @@ -85,6 +108,73 @@ public abstract class JobDataDistributor { () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId())); } + static class LockedException extends ServiceException { + public LockedException(String file) { + super(file, HttpStatus.NOT_FOUND); + } + } + + private void collectHistoricalData() { + PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; + + if (filter != null) { + this.fileStore.createLock(collectHistoricalDataLockName()) // + .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted) + : Mono.error(new LockedException(collectHistoricalDataLockName()))) // + .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) + .flatMap(sourceName -> fileStore.listFiles(DataStore.Bucket.FILES, sourceName), 1) // + .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) // + .map(this::createFakeEvent) // + .flatMap(event -> filterAndBuffer(event, this.job), 1) // + .flatMap(this::sendToClient, 1) // + .onErrorResume(this::handleCollectHistoricalDataError) // + .collectList() // + .flatMap(list -> fileStore.deleteLock(collectHistoricalDataLockName())) // + .subscribe(); + } + } + + private Mono handleCollectHistoricalDataError(Throwable t) { + + if (t instanceof LockedException) { + logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId()); + return Mono.empty(); // Ignore + } else { + return fileStore.deleteLock(collectHistoricalDataLockName()) // + .map(bool -> "OK") // + .onErrorResume(t2 -> Mono.empty()); + } + } + + private String collectHistoricalDataLockName() { + return "collectHistoricalDataLock" + this.job.getId(); + } + + private Flux createFakeEvent(String fileName) { + + NewFileEvent ev = new NewFileEvent(fileName, this.applConfig.getS3Bucket()); + + return Flux.just(new TopicListener.DataFromTopic("", gson.toJson(ev))); + } + + private boolean filterStartTime(String startTimeStr, String fileName) { + // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json + try { + String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2); + fileTimePart = fileTimePart.substring(0, 18); + + DateTimeFormatter formatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); + + OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter); + OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr); + + return startTime.isBefore(fileStartTime); + } catch (Exception e) { + logger.warn("Time parsing exception: {}", e.getMessage()); + return false; + } + } + private void handleExceptionInStream(Throwable t) { logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); stop(); @@ -106,6 +196,7 @@ public abstract class JobDataDistributor { private Flux filterAndBuffer(Flux inputFlux, Job job) { Flux filtered = // inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) // + .flatMap(this::getDataFromFileIfNewPmFileEvent, 100) // .map(job::filter) // .filter(f -> !f.isEmpty()) // .doOnNext(f -> job.getStatistics().filtered(f.value)); // @@ -120,6 +211,36 @@ public abstract class JobDataDistributor { return filtered; } + private Mono getDataFromFileIfNewPmFileEvent(DataFromTopic data) { + if (this.job.getType().getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) { + return Mono.just(data); + } + + try { + NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); + if (ev.getObjectStoreBucket() != null) { + if (this.applConfig.isS3Enabled()) { + return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) // + .map(str -> new DataFromTopic(data.key, str)); + } else { + logger.error("S3 is not configured in application.yaml, ignoring: {}", data); + return Mono.empty(); + } + } else { + if (applConfig.getPmFilesPath().isEmpty() || ev.getFilename() == null) { + logger.debug("Passing data {}", data); + return Mono.just(data); + } else { + Path path = Path.of(this.applConfig.getPmFilesPath(), ev.getFilename()); + String pmReportJson = Files.readString(path, Charset.defaultCharset()); + return Mono.just(new DataFromTopic(data.key, pmReportJson)); + } + } + } catch (Exception e) { + return Mono.just(data); + } + } + private String quoteNonJson(String str, Job job) { return job.getType().isJson() ? str : quote(str); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 8f77381..08c8167 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -50,7 +50,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor { private final ApplicationConfig appConfig; public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { - super(job); + super(job, appConfig); this.appConfig = appConfig; } @@ -58,7 +58,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor { protected Mono sendToClient(Filter.FilteredData data) { Job job = this.getJob(); - logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic()); + logger.trace("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic()); SenderRecord senderRecord = senderRecord(data, job); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 8647b9b..6b5a0f7 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -20,17 +20,10 @@ package org.oran.dmaapadapter.tasks; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import java.net.URI; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import lombok.Builder; import lombok.Getter; @@ -44,18 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.ResponseBytes; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; /** * The class streams incoming requests from a Kafka topic and sends them further @@ -68,13 +51,6 @@ public class KafkaTopicListener implements TopicListener { private final InfoType type; private Flux dataFromTopic; - @Getter - private static S3AsyncClient s3AsynchClient; - - private static Gson gson = new GsonBuilder() // - .disableHtmlEscaping() // - .create(); // - @ToString @Builder public static class NewFileEvent { @@ -85,16 +61,9 @@ public class KafkaTopicListener implements TopicListener { private String objectStoreBucket; } - public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { - this.applicationConfig = applicationConfig; + public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) { + this.applicationConfig = applConfig; this.type = type; - if (applicationConfig.isS3Enabled()) { - synchronized (KafkaTopicListener.class) { - if (s3AsynchClient == null) { - s3AsynchClient = getS3AsyncClientBuilder().build(); - } - } - } } @Override @@ -111,79 +80,16 @@ public class KafkaTopicListener implements TopicListener { return KafkaReceiver.create(kafkaInputProperties(clientId)) // .receiveAutoAck() // .concatMap(consumerRecord -> consumerRecord) // - .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), + .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value())) // .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) // .map(input -> new DataFromTopic(input.key(), input.value())) // - .flatMap(this::getDataFromFileIfNewPmFileEvent, 100) // .publish() // .autoConnect(1); } - private S3AsyncClientBuilder getS3AsyncClientBuilder() { - URI uri = URI.create(this.applicationConfig.getS3EndpointOverride()); - return S3AsyncClient.builder() // - .region(Region.US_EAST_1) // - .endpointOverride(uri) // - .credentialsProvider(StaticCredentialsProvider.create( // - AwsBasicCredentials.create(this.applicationConfig.getS3AccessKeyId(), // - this.applicationConfig.getS3SecretAccessKey()))); - - } - - private Mono getDataFromS3Object(String bucket, String key) { - if (!this.applicationConfig.isS3Enabled()) { - logger.error("Missing S3 confinguration in application.yaml, ignoring bucket: {}, key: {}", bucket, key); - return Mono.empty(); - } - - GetObjectRequest request = GetObjectRequest.builder() // - .bucket(bucket) // - .key(key) // - .build(); - - CompletableFuture> future = s3AsynchClient.getObject(request, - AsyncResponseTransformer.toBytes()); - - return Mono.fromFuture(future) // - .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) // - .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // - .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // - .onErrorResume(t -> Mono.empty()); - } - - private Mono getDataFromFileIfNewPmFileEvent(DataFromTopic data) { - if (this.type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) { - return Mono.just(data); - } - - try { - NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); - if (ev.getObjectStoreBucket() != null) { - if (applicationConfig.isS3Enabled()) { - return getDataFromS3Object(ev.getObjectStoreBucket(), ev.getFilename()) // - .map(str -> new DataFromTopic(data.key, str)); - } else { - logger.error("S3 is not configured in application.yaml, ignoring: {}", data); - return Mono.empty(); - } - } else { - if (applicationConfig.getPmFilesPath().isEmpty() || ev.filename == null) { - logger.debug("Passing data {}", data); - return Mono.just(data); - } else { - Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename()); - String pmReportJson = Files.readString(path, Charset.defaultCharset()); - return Mono.just(new DataFromTopic(data.key, pmReportJson)); - } - } - } catch (Exception e) { - return Mono.just(data); - } - } - private ReceiverOptions kafkaInputProperties(String clientId) { Map consumerProps = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java new file mode 100644 index 0000000..0a2cf25 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java @@ -0,0 +1,262 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.net.URI; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; + +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class S3ObjectStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); + private final ApplicationConfig applicationConfig; + + private static S3AsyncClient s3AsynchClient; + + public S3ObjectStore(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + + getS3AsynchClient(applicationConfig); + } + + private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) { + if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { + s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); + } + return s3AsynchClient; + } + + private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) { + URI uri = URI.create(applicationConfig.getS3EndpointOverride()); + return S3AsyncClient.builder() // + .region(Region.US_EAST_1) // + .endpointOverride(uri) // + .credentialsProvider(StaticCredentialsProvider.create( // + AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // + applicationConfig.getS3SecretAccessKey()))); + + } + + @Override + public Flux listFiles(Bucket bucket, String prefix) { + return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key); + } + + @Override + public Mono createLock(String name) { + return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) // + .onErrorResume(t -> createLock(name, null)); + } + + private Mono createLock(String name, HeadObjectResponse head) { + final long MAX_AGE_SECONDS = (long) 60 * 5; + + if (head == null || head.lastModified().getEpochSecond() - Instant.now().getEpochSecond() > MAX_AGE_SECONDS) { + + return this.putObject(Bucket.LOCKS, name, "") // + .flatMap(resp -> Mono.just(true)) // + .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) // + .onErrorResume(t -> Mono.just(false)); + } else { + return Mono.just(false); + } + } + + @Override + public Mono deleteLock(String name) { + return deleteObject(Bucket.LOCKS, name); + } + + @Override + public Mono deleteObject(Bucket bucket, String name) { + + DeleteObjectRequest request = DeleteObjectRequest.builder() // + .bucket(bucket(bucket)) // + .key(name) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteObject(request); + + return Mono.fromFuture(future).map(resp -> true); + } + + @Override + public Mono readFile(Bucket bucket, String fileName) { + return getDataFromS3Object(bucket(bucket), fileName); + } + + @Override + public Mono readFile(String bucket, String fileName) { + return getDataFromS3Object(bucket, fileName); + } + + public Mono putObject(Bucket bucket, String fileName, String bodyString) { + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(bucket(bucket)) // + .key(fileName) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromString(bodyString); + + CompletableFuture future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(putObjectResponse -> fileName) // + .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); + } + + public Mono copyFileToS3(Bucket bucket, Path fromFile, String toFile) { + return copyFileToS3Bucket(bucket(bucket), fromFile, toFile); + } + + public Mono createS3Bucket(Bucket bucket) { + return createS3Bucket(bucket(bucket)); + } + + private Mono createS3Bucket(String s3Bucket) { + + CreateBucketRequest request = CreateBucketRequest.builder() // + .bucket(s3Bucket) // + .build(); + + CompletableFuture future = s3AsynchClient.createBucket(request); + + return Mono.fromFuture(future) // + .map(f -> s3Bucket) // + .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) + .onErrorResume(t -> Mono.just(s3Bucket)); + } + + public Mono deleteBucket(Bucket bucket) { + return listFiles(bucket, "") // + .flatMap(key -> deleteObject(bucket, key)) // + .collectList() // + .flatMap(list -> deleteBucketFromS3Storage(bucket)) // + .map(resp -> "OK") + .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage())) + .onErrorResume(t -> Mono.just("NOK")); + } + + private Mono deleteBucketFromS3Storage(Bucket bucket) { + DeleteBucketRequest request = DeleteBucketRequest.builder() // + .bucket(bucket(bucket)) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteBucket(request); + + return Mono.fromFuture(future); + } + + private String bucket(Bucket bucket) { + return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket(); + } + + private Mono copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) { + + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(s3Bucket) // + .key(s3Key) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromFile(fileName); + + CompletableFuture future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(f -> s3Key) // + .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); + + } + + private Mono getHeadObject(String bucket, String key) { + HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build(); + + CompletableFuture future = s3AsynchClient.headObject(request); + return Mono.fromFuture(future); + } + + private Flux listObjectsInBucket(String bucket, String prefix) { + ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() // + .bucket(bucket) // + .maxKeys(1100) // + .prefix(prefix) // + .build(); + CompletableFuture future = s3AsynchClient.listObjects(listObjectsRequest); + + return Mono.fromFuture(future) // + .map(ListObjectsResponse::contents) // + .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // + .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // + .flatMapMany(Flux::fromIterable) // + .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) // + + ; + } + + private Mono getDataFromS3Object(String bucket, String key) { + + GetObjectRequest request = GetObjectRequest.builder() // + .bucket(bucket) // + .key(key) // + .build(); + + CompletableFuture> future = + s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); + + return Mono.fromFuture(future) // + .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) // + .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // + .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // + .onErrorResume(t -> Mono.empty()); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index fcc94ee..d5868e5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -95,7 +95,7 @@ public class TopicListeners { private JobDataDistributor createConsumer(Job job) { return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) - : new HttpJobDataDistributor(job); + : new HttpJobDataDistributor(job, appConfig); } private void addConsumer(Job job, MultiMap distributors, diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index 5e774ce..7d5ab62 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -51,6 +51,9 @@ "type": "string" } ] + }, + "pmRopStartTime": { + "type": "string" } } } @@ -69,7 +72,7 @@ "type": "integer", "minimum": 1 }, - "kafkaOutputTopic" : { + "kafkaOutputTopic": { "type": "string" }, "bufferTimeout": { @@ -92,4 +95,4 @@ ] } } -} +} \ No newline at end of file diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index ba39e88..9cf3dec 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -32,7 +32,6 @@ import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -54,7 +53,11 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.DataStore; +import org.oran.dmaapadapter.tasks.DataStore.Bucket; +import org.oran.dmaapadapter.tasks.FileStore; import org.oran.dmaapadapter.tasks.KafkaTopicListener; +import org.oran.dmaapadapter.tasks.S3ObjectStore; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.slf4j.Logger; @@ -70,15 +73,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.TestPropertySource; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.CreateBucketResponse; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; @SuppressWarnings("java:S3577") // Rename class @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) @@ -86,7 +83,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // - "app.pm-files-path=./src/test/resources/"}) // + "app.pm-files-path=./src/test/resources/", "app.s3.locksBucket=ropfilelocks", "app.s3.bucket=ropfiles"}) // class IntegrationWithKafka { final String TYPE_ID = "KafkaInformationType"; @@ -189,7 +186,7 @@ class IntegrationWithKafka { private void set(TopicListener.DataFromTopic receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; this.count++; - logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput); + logger.trace("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput); } synchronized String lastKey() { @@ -217,6 +214,10 @@ class IntegrationWithKafka { } kafkaReceiver.reset(); kafkaReceiver2.reset(); + + S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); + fileStore.deleteBucket(DataStore.Bucket.LOCKS).block(); } @AfterEach @@ -289,6 +290,7 @@ class IntegrationWithKafka { ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { try { Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic); + String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -361,37 +363,6 @@ class IntegrationWithKafka { Thread.sleep(4000); } - private Mono copyFileToS3Bucket(Path fileName, String s3Bucket, String s3Key) { - - PutObjectRequest request = PutObjectRequest.builder() // - .bucket(s3Bucket) // - .key(s3Key) // - .build(); - - AsyncRequestBody body = AsyncRequestBody.fromFile(fileName); - - CompletableFuture future = KafkaTopicListener.getS3AsynchClient().putObject(request, body); - - return Mono.fromFuture(future) // - .map(f -> s3Key) // - .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())) - .onErrorResume(t -> Mono.empty()); - } - - private Mono createS3Bucket(String s3Bucket) { - - CreateBucketRequest request = CreateBucketRequest.builder() // - .bucket(s3Bucket) // - .build(); - - CompletableFuture future = KafkaTopicListener.getS3AsynchClient().createBucket(request); - - return Mono.fromFuture(future) // - .map(f -> s3Bucket) // - .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) - .onErrorResume(t -> Mono.just(s3Bucket)); - } - @Test void simpleCase() throws Exception { final String JOB_ID = "ID"; @@ -494,7 +465,7 @@ class IntegrationWithKafka { } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. - @Test + // @Test void kafkaCharacteristics_pmFilter_localFile() throws Exception { // Filter PM reports and sent to two jobs over Kafka @@ -517,7 +488,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 100000; + final int NO_OF_OBJECTS = 100; Instant startTime = Instant.now(); @@ -540,6 +511,22 @@ class IntegrationWithKafka { printStatistics(); } + @Test + void testListFiles() { + FileStore fileStore = new FileStore(applicationConfig); + + fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + + List files = fileStore.listFiles(Bucket.FILES, "O-DU-112").collectList().block(); + assertThat(files).hasSize(1); + + files = fileStore.listFiles(Bucket.FILES, "O-DU-1122").collectList().block(); + assertThat(files).hasSize(1); + + fileStore.deleteObject(Bucket.FILES, files.get(0)); + } + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test void kafkaCharacteristics_pmFilter_s3() throws Exception { @@ -564,16 +551,19 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 100000; + final int NO_OF_OBJECTS = 100; Instant startTime = Instant.now(); KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() // - .filename("pm_report.json").objectStoreBucket("ropfiles") // + .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) // .build(); - createS3Bucket("ropfiles").block(); - copyFileToS3Bucket(Path.of("./src/test/resources/pm_report.json"), "ropfiles", "pm_report.json").block(); + S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + + fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); + fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json") + .block(); String eventAsString = gson.toJson(event); @@ -590,6 +580,36 @@ class IntegrationWithKafka { logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); printStatistics(); + + } + + @Test + void testHistoricalData() throws Exception { + // test + final String JOB_ID = "testHistoricalData"; + + S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + + fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); + fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block(); + + fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + + fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), + "OTHER_SOURCENAME/test.json").block(); + + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getSourceNames().add("O-DU-1122"); + filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00"); + + this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, + restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1)); } @Test @@ -608,7 +628,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); - var dataToSend = Flux.range(1, 100).map(i -> kafkaSenderRecord("Message_" + i, "", TYPE_ID)); // Message_1, + var dataToSend = Flux.range(1, 100).map(i -> kafkaSenderRecord("XMessage_" + i, "", TYPE_ID)); // Message_1, // Message_2 // etc. sendDataToKafka(dataToSend); // this should not overflow -- 2.16.6