From 5fd5da279e7f0b4ddce30ce93cd79e36b48f4e4a Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Tue, 4 Oct 2022 15:18:15 +0200 Subject: [PATCH] Unzipping of files with ending .gz Removed the bucket from the NewFileEvenet since the S3 bucket needs to be configured anyway to query already collected PM data. Fixed some bug. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Idaac044bd3578cbf97f700b70b73f9ed8ac568b1 --- .../org/oran/dmaapadapter/datastore/DataStore.java | 4 +- .../org/oran/dmaapadapter/datastore/FileStore.java | 10 +- .../oran/dmaapadapter/datastore/S3ObjectStore.java | 15 +- .../org/oran/dmaapadapter/tasks/FileStore.java | 137 ----------- .../dmaapadapter/tasks/JobDataDistributor.java | 36 +-- .../dmaapadapter/tasks/KafkaTopicListener.java | 14 -- .../tasks/{DataStore.java => NewFileEvent.java} | 24 +- .../org/oran/dmaapadapter/tasks/S3ObjectStore.java | 261 --------------------- .../org/oran/dmaapadapter/tasks/TopicListener.java | 1 + .../org/oran/dmaapadapter/ApplicationTest.java | 14 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 21 +- 11 files changed, 54 insertions(+), 483 deletions(-) delete mode 100644 src/main/java/org/oran/dmaapadapter/tasks/FileStore.java rename src/main/java/org/oran/dmaapadapter/tasks/{DataStore.java => NewFileEvent.java} (61%) delete mode 100644 src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java diff --git a/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java index 167ff63..255b77a 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java @@ -30,9 +30,7 @@ public interface DataStore { public Flux listFiles(Bucket bucket, String prefix); - public Mono readFile(Bucket bucket, String fileName); - - public Mono readFile(String bucket, String fileName); + public Mono readFile(Bucket bucket, String fileName); public Mono createLock(String name); diff --git a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java index e78653c..3d4cb4d 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java @@ -29,8 +29,6 @@ import java.util.List; import java.util.stream.Stream; import org.oran.dmaapadapter.configuration.ApplicationConfig; -import org.oran.dmaapadapter.exceptions.ServiceException; -import org.springframework.http.HttpStatus; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -72,14 +70,10 @@ public class FileStore implements DataStore { return fullName.substring(applicationConfig.getPmFilesPath().length()); } - public Mono readFile(String bucket, String fileName) { - return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT)); - } - @Override - public Mono readFile(Bucket bucket, String fileName) { + public Mono readFile(Bucket bucket, String fileName) { try { - String contents = Files.readString(path(fileName)); + byte[] contents = Files.readAllBytes(path(fileName)); return Mono.just(contents); } catch (Exception e) { return Mono.error(e); diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java index c3b2f9d..de9da16 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java @@ -21,7 +21,6 @@ package org.oran.dmaapadapter.datastore; import java.net.URI; -import java.nio.charset.Charset; import java.nio.file.Path; import java.util.concurrent.CompletableFuture; @@ -127,15 +126,10 @@ public class S3ObjectStore implements DataStore { } @Override - public Mono readFile(Bucket bucket, String fileName) { + public Mono readFile(Bucket bucket, String fileName) { return getDataFromS3Object(bucket(bucket), fileName); } - @Override - public Mono readFile(String bucket, String fileName) { - return getDataFromS3Object(bucket, fileName); - } - public Mono putObject(Bucket bucket, String fileName, String bodyString) { PutObjectRequest request = PutObjectRequest.builder() // .bucket(bucket(bucket)) // @@ -239,7 +233,7 @@ public class S3ObjectStore implements DataStore { ; } - private Mono getDataFromS3Object(String bucket, String key) { + private Mono getDataFromS3Object(String bucket, String key) { GetObjectRequest request = GetObjectRequest.builder() // .bucket(bucket) // @@ -250,8 +244,9 @@ public class S3ObjectStore implements DataStore { s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); return Mono.fromFuture(future) // - .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) // - .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // + .map(b -> b.asByteArray()) // + .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, + t.getMessage())) // .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // .onErrorResume(t -> Mono.empty()); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java deleted file mode 100644 index f11cf11..0000000 --- a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java +++ /dev/null @@ -1,137 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * O-RAN-SC - * %% - * Copyright (C) 2021 Nordix Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.oran.dmaapadapter.tasks; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -import org.oran.dmaapadapter.configuration.ApplicationConfig; -import org.oran.dmaapadapter.exceptions.ServiceException; -import org.springframework.http.HttpStatus; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class FileStore implements DataStore { - - ApplicationConfig applicationConfig; - - public FileStore(ApplicationConfig applicationConfig) { - this.applicationConfig = applicationConfig; - } - - @Override - public Flux listFiles(Bucket bucket, String prefix) { - Path root = Path.of(applicationConfig.getPmFilesPath(), prefix); - if (!root.toFile().exists()) { - root = root.getParent(); - } - - List result = new ArrayList<>(); - try (Stream stream = Files.walk(root, Integer.MAX_VALUE)) { - - stream.forEach(path -> filterListFiles(path, prefix, result)); - - return Flux.fromIterable(result); - } catch (Exception e) { - return Flux.error(e); - } - } - - private void filterListFiles(Path path, String prefix, List result) { - if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { - result.add(externalName(path)); - } - } - - private String externalName(Path f) { - String fullName = f.toString(); - return fullName.substring(applicationConfig.getPmFilesPath().length()); - } - - public Mono readFile(String bucket, String fileName) { - return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT)); - } - - @Override - public Mono readFile(Bucket bucket, String fileName) { - try { - byte[] contents = Files.readAllBytes(path(fileName)); - return Mono.just(contents); - } catch (Exception e) { - return Mono.error(e); - } - } - - @Override - public Mono createLock(String name) { - File file = path(name).toFile(); - try { - boolean res = file.createNewFile(); - return Mono.just(res || isAged(file)); - } catch (Exception e) { - return Mono.just(file.exists() && isAged(file)); - } - } - - public Mono copyFileTo(Path from, String to) { - try { - Path toPath = path(to); - Files.createDirectories(toPath); - Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING); - return Mono.just(to); - } catch (Exception e) { - return Mono.error(e); - } - } - - private boolean isAged(File file) { - final long MAX_AGE_SECONDS = (long) 60 * 5; - return Instant.now().getEpochSecond() - file.lastModified() > MAX_AGE_SECONDS; - - } - - @Override - public Mono deleteLock(String name) { - return deleteObject(Bucket.LOCKS, name); - } - - @Override - public Mono deleteObject(Bucket bucket, String name) { - try { - Files.delete(path(name)); - return Mono.just(true); - } catch (Exception e) { - return Mono.just(false); - } - } - - private Path path(String name) { - return Path.of(applicationConfig.getPmFilesPath(), name); - } - -} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index bd62d4f..6d2327d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -22,9 +22,6 @@ package org.oran.dmaapadapter.tasks; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -33,12 +30,14 @@ import java.util.zip.GZIPInputStream; import lombok.Getter; import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.datastore.DataStore; +import org.oran.dmaapadapter.datastore.FileStore; +import org.oran.dmaapadapter.datastore.S3ObjectStore; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.Job; -import org.oran.dmaapadapter.tasks.KafkaTopicListener.NewFileEvent; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,7 +151,7 @@ public abstract class JobDataDistributor { private Flux createFakeEvent(String fileName) { - NewFileEvent ev = new NewFileEvent(fileName, this.applConfig.getS3Bucket()); + NewFileEvent ev = new NewFileEvent(fileName); return Flux.just(new TopicListener.DataFromTopic("", gson.toJson(ev))); } @@ -225,25 +224,16 @@ public abstract class JobDataDistributor { try { NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); - if (ev.getObjectStoreBucket() != null) { - if (this.applConfig.isS3Enabled()) { - return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) // - .map(str -> unzip(str, ev.getFilename())) // - .map(str -> new DataFromTopic(data.key, str)); - } else { - logger.error("S3 is not configured in application.yaml, ignoring: {}", data); - return Mono.empty(); - } - } else { - if (applConfig.getPmFilesPath().isEmpty() || ev.getFilename() == null) { - logger.debug("Passing data {}", data); - return Mono.just(data); - } else { - Path path = Path.of(this.applConfig.getPmFilesPath(), ev.getFilename()); - String pmReportJson = Files.readString(path, Charset.defaultCharset()); - return Mono.just(new DataFromTopic(data.key, pmReportJson)); - } + + if (ev.getFilename() == null) { + logger.warn("Ignoring received message: {}", data); + return Mono.empty(); } + + return fileStore.readFile(DataStore.Bucket.FILES, ev.getFilename()) // + .map(bytes -> unzip(bytes, ev.getFilename())) // + .map(bytes -> new DataFromTopic(data.key, bytes)); + } catch (Exception e) { return Mono.just(data); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index eaeeae4..f0f2513 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -24,10 +24,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import lombok.Builder; -import lombok.Getter; -import lombok.ToString; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; @@ -50,16 +46,6 @@ public class KafkaTopicListener implements TopicListener { private final InfoType type; private Flux dataFromTopic; - @ToString - @Builder - public static class NewFileEvent { - @Getter - private String filename; - - @Getter - private String objectStoreBucket; - } - public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) { this.applicationConfig = applConfig; this.type = type; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java b/src/main/java/org/oran/dmaapadapter/tasks/NewFileEvent.java similarity index 61% rename from src/main/java/org/oran/dmaapadapter/tasks/DataStore.java rename to src/main/java/org/oran/dmaapadapter/tasks/NewFileEvent.java index 2bb2753..0ccca0c 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/NewFileEvent.java @@ -20,24 +20,16 @@ package org.oran.dmaapadapter.tasks; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -public interface DataStore { - public enum Bucket { - FILES, LOCKS - } +import lombok.Builder; +import lombok.Getter; +import lombok.ToString; - public Flux listFiles(Bucket bucket, String prefix); - public Mono readFile(Bucket bucket, String fileName); - - public Mono readFile(String bucket, String fileName); - - public Mono createLock(String name); - - public Mono deleteLock(String name); - - public Mono deleteObject(Bucket bucket, String name); +@ToString +@Builder +public class NewFileEvent { + @Getter + private String filename; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java deleted file mode 100644 index 74ba49d..0000000 --- a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java +++ /dev/null @@ -1,261 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * O-RAN-SC - * %% - * Copyright (C) 2021 Nordix Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.oran.dmaapadapter.tasks; - -import java.net.URI; -import java.nio.file.Path; -import java.time.Instant; -import java.util.concurrent.CompletableFuture; - -import org.oran.dmaapadapter.configuration.ApplicationConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.ResponseBytes; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.CreateBucketResponse; -import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.HeadObjectResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsResponse; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.services.s3.model.S3Object; - -public class S3ObjectStore implements DataStore { - private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); - private final ApplicationConfig applicationConfig; - - private static S3AsyncClient s3AsynchClient; - - public S3ObjectStore(ApplicationConfig applicationConfig) { - this.applicationConfig = applicationConfig; - - getS3AsynchClient(applicationConfig); - } - - private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) { - if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { - s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); - } - return s3AsynchClient; - } - - private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) { - URI uri = URI.create(applicationConfig.getS3EndpointOverride()); - return S3AsyncClient.builder() // - .region(Region.US_EAST_1) // - .endpointOverride(uri) // - .credentialsProvider(StaticCredentialsProvider.create( // - AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // - applicationConfig.getS3SecretAccessKey()))); - - } - - @Override - public Flux listFiles(Bucket bucket, String prefix) { - return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key); - } - - @Override - public Mono createLock(String name) { - return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) // - .onErrorResume(t -> createLock(name, null)); - } - - private Mono createLock(String name, HeadObjectResponse head) { - final long MAX_AGE_SECONDS = (long) 60 * 5; - - if (head == null || head.lastModified().getEpochSecond() - Instant.now().getEpochSecond() > MAX_AGE_SECONDS) { - - return this.putObject(Bucket.LOCKS, name, "") // - .flatMap(resp -> Mono.just(true)) // - .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) // - .onErrorResume(t -> Mono.just(false)); - } else { - return Mono.just(false); - } - } - - @Override - public Mono deleteLock(String name) { - return deleteObject(Bucket.LOCKS, name); - } - - @Override - public Mono deleteObject(Bucket bucket, String name) { - - DeleteObjectRequest request = DeleteObjectRequest.builder() // - .bucket(bucket(bucket)) // - .key(name) // - .build(); - - CompletableFuture future = s3AsynchClient.deleteObject(request); - - return Mono.fromFuture(future).map(resp -> true); - } - - @Override - public Mono readFile(Bucket bucket, String fileName) { - return getDataFromS3Object(bucket(bucket), fileName); - } - - @Override - public Mono readFile(String bucket, String fileName) { - return getDataFromS3Object(bucket, fileName); - } - - public Mono putObject(Bucket bucket, String fileName, String bodyString) { - PutObjectRequest request = PutObjectRequest.builder() // - .bucket(bucket(bucket)) // - .key(fileName) // - .build(); - - AsyncRequestBody body = AsyncRequestBody.fromString(bodyString); - - CompletableFuture future = s3AsynchClient.putObject(request, body); - - return Mono.fromFuture(future) // - .map(putObjectResponse -> fileName) // - .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); - } - - public Mono copyFileToS3(Bucket bucket, Path fromFile, String toFile) { - return copyFileToS3Bucket(bucket(bucket), fromFile, toFile); - } - - public Mono createS3Bucket(Bucket bucket) { - return createS3Bucket(bucket(bucket)); - } - - private Mono createS3Bucket(String s3Bucket) { - - CreateBucketRequest request = CreateBucketRequest.builder() // - .bucket(s3Bucket) // - .build(); - - CompletableFuture future = s3AsynchClient.createBucket(request); - - return Mono.fromFuture(future) // - .map(f -> s3Bucket) // - .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) - .onErrorResume(t -> Mono.just(s3Bucket)); - } - - public Mono deleteBucket(Bucket bucket) { - return listFiles(bucket, "") // - .flatMap(key -> deleteObject(bucket, key)) // - .collectList() // - .flatMap(list -> deleteBucketFromS3Storage(bucket)) // - .map(resp -> "OK") - .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage())) - .onErrorResume(t -> Mono.just("NOK")); - } - - private Mono deleteBucketFromS3Storage(Bucket bucket) { - DeleteBucketRequest request = DeleteBucketRequest.builder() // - .bucket(bucket(bucket)) // - .build(); - - CompletableFuture future = s3AsynchClient.deleteBucket(request); - - return Mono.fromFuture(future); - } - - private String bucket(Bucket bucket) { - return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket(); - } - - private Mono copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) { - - PutObjectRequest request = PutObjectRequest.builder() // - .bucket(s3Bucket) // - .key(s3Key) // - .build(); - - AsyncRequestBody body = AsyncRequestBody.fromFile(fileName); - - CompletableFuture future = s3AsynchClient.putObject(request, body); - - return Mono.fromFuture(future) // - .map(f -> s3Key) // - .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); - - } - - private Mono getHeadObject(String bucket, String key) { - HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build(); - - CompletableFuture future = s3AsynchClient.headObject(request); - return Mono.fromFuture(future); - } - - private Flux listObjectsInBucket(String bucket, String prefix) { - ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() // - .bucket(bucket) // - .maxKeys(1100) // - .prefix(prefix) // - .build(); - CompletableFuture future = s3AsynchClient.listObjects(listObjectsRequest); - - return Mono.fromFuture(future) // - .map(ListObjectsResponse::contents) // - .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // - .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // - .flatMapMany(Flux::fromIterable) // - .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) // - - ; - } - - private Mono getDataFromS3Object(String bucket, String key) { - - GetObjectRequest request = GetObjectRequest.builder() // - .bucket(bucket) // - .key(key) // - .build(); - - CompletableFuture> future = - s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); - - return Mono.fromFuture(future) // - .map(b -> b.asByteArray()) // - .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // - .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // - .onErrorResume(t -> Mono.empty()); - } - -} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 373020c..3f06457 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -38,6 +38,7 @@ public interface TopicListener { @Getter @Setter + @ToString.Exclude private PmReport cachedPmReport; public DataFromTopic(String key, String value) { diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 84b5685..fe305c5 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -53,6 +53,7 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.datastore.FileStore; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReport; import org.oran.dmaapadapter.filter.PmReportFilter; @@ -62,6 +63,7 @@ import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.JobDataDistributor; +import org.oran.dmaapadapter.tasks.NewFileEvent; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; @@ -91,7 +93,7 @@ import reactor.test.StepVerifier; "app.webclient.trust-store=./config/truststore.jks", // "app.webclient.trust-store-used=true", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // - "app.s3.endpointOverride="}) + "app.pm-files-path=/tmp", "app.s3.endpointOverride="}) class ApplicationTest { @Autowired @@ -494,11 +496,15 @@ class ApplicationTest { // Return one messagefrom DMAAP and verify that the job (consumer) receives a // filtered PM message - String path = "./src/test/resources/pm_report.json"; - String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); + String path = "./src/test/resources/pm_report.json.gz"; + FileStore fs = new FileStore(this.applicationConfig); + fs.copyFileTo(Path.of(path), "pm_report.json.gz"); + + NewFileEvent event = NewFileEvent.builder().filename("pm_report.json.gz").build(); + DmaapSimulatorController.addPmResponse("{}"); // This should just be discarded - DmaapSimulatorController.addPmResponse(pmReportJson); + DmaapSimulatorController.addPmResponse(gson.toJson(event)); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 37c1b7d..4e616a8 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -56,6 +56,7 @@ import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.KafkaTopicListener; +import org.oran.dmaapadapter.tasks.NewFileEvent; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.slf4j.Logger; @@ -216,8 +217,9 @@ class IntegrationWithKafka { kafkaReceiver2.reset(); S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); - fileStore.deleteBucket(DataStore.Bucket.FILES).block(); - fileStore.deleteBucket(DataStore.Bucket.LOCKS).block(); + fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); + fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block(); + } @AfterEach @@ -230,6 +232,10 @@ class IntegrationWithKafka { this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset(); + + S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); + fileStore.deleteBucket(DataStore.Bucket.LOCKS).block(); } private AsyncRestClient restClient(boolean useTrustValidation) { @@ -488,19 +494,20 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 100; + final int NO_OF_OBJECTS = 50000; Instant startTime = Instant.now(); - KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() // - .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) // + final String FILE_NAME = "pm_report.json.gz"; + + NewFileEvent event = NewFileEvent.builder() // + .filename(FILE_NAME) // .build(); S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); - fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"), - "pm_report.json.gz").block(); + fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); String eventAsString = gson.toJson(event); -- 2.16.6