Unzipping of files with ending .gz 65/9165/1
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 4 Oct 2022 13:18:15 +0000 (15:18 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 4 Oct 2022 13:30:55 +0000 (15:30 +0200)
Removed the bucket from the NewFileEvenet since the S3 bucket needs to be configured anyway to query
already collected PM data.

Fixed some bug.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Idaac044bd3578cbf97f700b70b73f9ed8ac568b1

src/main/java/org/oran/dmaapadapter/datastore/DataStore.java
src/main/java/org/oran/dmaapadapter/datastore/FileStore.java
src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java
src/main/java/org/oran/dmaapadapter/tasks/FileStore.java [deleted file]
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/NewFileEvent.java [moved from src/main/java/org/oran/dmaapadapter/tasks/DataStore.java with 61% similarity]
src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java [deleted file]
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 167ff63..255b77a 100644 (file)
@@ -30,9 +30,7 @@ public interface DataStore {
 
     public Flux<String> listFiles(Bucket bucket, String prefix);
 
-    public Mono<String> readFile(Bucket bucket, String fileName);
-
-    public Mono<String> readFile(String bucket, String fileName);
+    public Mono<byte[]> readFile(Bucket bucket, String fileName);
 
     public Mono<Boolean> createLock(String name);
 
index e78653c..3d4cb4d 100644 (file)
@@ -29,8 +29,6 @@ import java.util.List;
 import java.util.stream.Stream;
 
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.oran.dmaapadapter.exceptions.ServiceException;
-import org.springframework.http.HttpStatus;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -72,14 +70,10 @@ public class FileStore implements DataStore {
         return fullName.substring(applicationConfig.getPmFilesPath().length());
     }
 
-    public Mono<String> readFile(String bucket, String fileName) {
-        return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
-    }
-
     @Override
-    public Mono<String> readFile(Bucket bucket, String fileName) {
+    public Mono<byte[]> readFile(Bucket bucket, String fileName) {
         try {
-            String contents = Files.readString(path(fileName));
+            byte[] contents = Files.readAllBytes(path(fileName));
             return Mono.just(contents);
         } catch (Exception e) {
             return Mono.error(e);
index c3b2f9d..de9da16 100644 (file)
@@ -21,7 +21,6 @@
 package org.oran.dmaapadapter.datastore;
 
 import java.net.URI;
-import java.nio.charset.Charset;
 import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 
@@ -127,15 +126,10 @@ public class S3ObjectStore implements DataStore {
     }
 
     @Override
-    public Mono<String> readFile(Bucket bucket, String fileName) {
+    public Mono<byte[]> readFile(Bucket bucket, String fileName) {
         return getDataFromS3Object(bucket(bucket), fileName);
     }
 
-    @Override
-    public Mono<String> readFile(String bucket, String fileName) {
-        return getDataFromS3Object(bucket, fileName);
-    }
-
     public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
         PutObjectRequest request = PutObjectRequest.builder() //
                 .bucket(bucket(bucket)) //
@@ -239,7 +233,7 @@ public class S3ObjectStore implements DataStore {
         ;
     }
 
-    private Mono<String> getDataFromS3Object(String bucket, String key) {
+    private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
 
         GetObjectRequest request = GetObjectRequest.builder() //
                 .bucket(bucket) //
@@ -250,8 +244,9 @@ public class S3ObjectStore implements DataStore {
                 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
 
         return Mono.fromFuture(future) //
-                .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
-                .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
+                .map(b -> b.asByteArray()) //
+                .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket,
+                        t.getMessage())) //
                 .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
                 .onErrorResume(t -> Mono.empty());
     }
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java
deleted file mode 100644 (file)
index f11cf11..0000000
+++ /dev/null
@@ -1,137 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2021 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oran.dmaapadapter.tasks;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.oran.dmaapadapter.exceptions.ServiceException;
-import org.springframework.http.HttpStatus;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public class FileStore implements DataStore {
-
-    ApplicationConfig applicationConfig;
-
-    public FileStore(ApplicationConfig applicationConfig) {
-        this.applicationConfig = applicationConfig;
-    }
-
-    @Override
-    public Flux<String> listFiles(Bucket bucket, String prefix) {
-        Path root = Path.of(applicationConfig.getPmFilesPath(), prefix);
-        if (!root.toFile().exists()) {
-            root = root.getParent();
-        }
-
-        List<String> result = new ArrayList<>();
-        try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
-
-            stream.forEach(path -> filterListFiles(path, prefix, result));
-
-            return Flux.fromIterable(result);
-        } catch (Exception e) {
-            return Flux.error(e);
-        }
-    }
-
-    private void filterListFiles(Path path, String prefix, List<String> result) {
-        if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
-            result.add(externalName(path));
-        }
-    }
-
-    private String externalName(Path f) {
-        String fullName = f.toString();
-        return fullName.substring(applicationConfig.getPmFilesPath().length());
-    }
-
-    public Mono<byte[]> readFile(String bucket, String fileName) {
-        return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
-    }
-
-    @Override
-    public Mono<byte[]> readFile(Bucket bucket, String fileName) {
-        try {
-            byte[] contents = Files.readAllBytes(path(fileName));
-            return Mono.just(contents);
-        } catch (Exception e) {
-            return Mono.error(e);
-        }
-    }
-
-    @Override
-    public Mono<Boolean> createLock(String name) {
-        File file = path(name).toFile();
-        try {
-            boolean res = file.createNewFile();
-            return Mono.just(res || isAged(file));
-        } catch (Exception e) {
-            return Mono.just(file.exists() && isAged(file));
-        }
-    }
-
-    public Mono<String> copyFileTo(Path from, String to) {
-        try {
-            Path toPath = path(to);
-            Files.createDirectories(toPath);
-            Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
-            return Mono.just(to);
-        } catch (Exception e) {
-            return Mono.error(e);
-        }
-    }
-
-    private boolean isAged(File file) {
-        final long MAX_AGE_SECONDS = (long) 60 * 5;
-        return Instant.now().getEpochSecond() - file.lastModified() > MAX_AGE_SECONDS;
-
-    }
-
-    @Override
-    public Mono<Boolean> deleteLock(String name) {
-        return deleteObject(Bucket.LOCKS, name);
-    }
-
-    @Override
-    public Mono<Boolean> deleteObject(Bucket bucket, String name) {
-        try {
-            Files.delete(path(name));
-            return Mono.just(true);
-        } catch (Exception e) {
-            return Mono.just(false);
-        }
-    }
-
-    private Path path(String name) {
-        return Path.of(applicationConfig.getPmFilesPath(), name);
-    }
-
-}
index bd62d4f..6d2327d 100644 (file)
@@ -22,9 +22,6 @@ package org.oran.dmaapadapter.tasks;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
@@ -33,12 +30,14 @@ import java.util.zip.GZIPInputStream;
 import lombok.Getter;
 
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.FileStore;
+import org.oran.dmaapadapter.datastore.S3ObjectStore;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Job;
-import org.oran.dmaapadapter.tasks.KafkaTopicListener.NewFileEvent;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -152,7 +151,7 @@ public abstract class JobDataDistributor {
 
     private Flux<TopicListener.DataFromTopic> createFakeEvent(String fileName) {
 
-        NewFileEvent ev = new NewFileEvent(fileName, this.applConfig.getS3Bucket());
+        NewFileEvent ev = new NewFileEvent(fileName);
 
         return Flux.just(new TopicListener.DataFromTopic("", gson.toJson(ev)));
     }
@@ -225,25 +224,16 @@ public abstract class JobDataDistributor {
 
         try {
             NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
-            if (ev.getObjectStoreBucket() != null) {
-                if (this.applConfig.isS3Enabled()) {
-                    return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) //
-                            .map(str -> unzip(str, ev.getFilename())) //
-                            .map(str -> new DataFromTopic(data.key, str));
-                } else {
-                    logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
-                    return Mono.empty();
-                }
-            } else {
-                if (applConfig.getPmFilesPath().isEmpty() || ev.getFilename() == null) {
-                    logger.debug("Passing data {}", data);
-                    return Mono.just(data);
-                } else {
-                    Path path = Path.of(this.applConfig.getPmFilesPath(), ev.getFilename());
-                    String pmReportJson = Files.readString(path, Charset.defaultCharset());
-                    return Mono.just(new DataFromTopic(data.key, pmReportJson));
-                }
+
+            if (ev.getFilename() == null) {
+                logger.warn("Ignoring received message: {}", data);
+                return Mono.empty();
             }
+
+            return fileStore.readFile(DataStore.Bucket.FILES, ev.getFilename()) //
+                    .map(bytes -> unzip(bytes, ev.getFilename())) //
+                    .map(bytes -> new DataFromTopic(data.key, bytes));
+
         } catch (Exception e) {
             return Mono.just(data);
         }
index eaeeae4..f0f2513 100644 (file)
@@ -24,10 +24,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import lombok.Builder;
-import lombok.Getter;
-import lombok.ToString;
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
@@ -50,16 +46,6 @@ public class KafkaTopicListener implements TopicListener {
     private final InfoType type;
     private Flux<DataFromTopic> dataFromTopic;
 
-    @ToString
-    @Builder
-    public static class NewFileEvent {
-        @Getter
-        private String filename;
-
-        @Getter
-        private String objectStoreBucket;
-    }
-
     public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) {
         this.applicationConfig = applConfig;
         this.type = type;
 
 package org.oran.dmaapadapter.tasks;
 
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
-public interface DataStore {
-    public enum Bucket {
-        FILES, LOCKS
-    }
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
 
-    public Flux<String> listFiles(Bucket bucket, String prefix);
 
-    public Mono<byte[]> readFile(Bucket bucket, String fileName);
-
-    public Mono<byte[]> readFile(String bucket, String fileName);
-
-    public Mono<Boolean> createLock(String name);
-
-    public Mono<Boolean> deleteLock(String name);
-
-    public Mono<Boolean> deleteObject(Bucket bucket, String name);
 
+@ToString
+@Builder
+public class NewFileEvent {
+    @Getter
+    private String filename;
 }
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java
deleted file mode 100644 (file)
index 74ba49d..0000000
+++ /dev/null
@@ -1,261 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2021 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oran.dmaapadapter.tasks;
-
-import java.net.URI;
-import java.nio.file.Path;
-import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
-
-import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.ResponseBytes;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.core.async.AsyncResponseTransformer;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
-import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
-import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
-import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
-import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
-import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
-import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
-import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
-import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.PutObjectResponse;
-import software.amazon.awssdk.services.s3.model.S3Object;
-
-public class S3ObjectStore implements DataStore {
-    private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
-    private final ApplicationConfig applicationConfig;
-
-    private static S3AsyncClient s3AsynchClient;
-
-    public S3ObjectStore(ApplicationConfig applicationConfig) {
-        this.applicationConfig = applicationConfig;
-
-        getS3AsynchClient(applicationConfig);
-    }
-
-    private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
-        if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
-            s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
-        }
-        return s3AsynchClient;
-    }
-
-    private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
-        URI uri = URI.create(applicationConfig.getS3EndpointOverride());
-        return S3AsyncClient.builder() //
-                .region(Region.US_EAST_1) //
-                .endpointOverride(uri) //
-                .credentialsProvider(StaticCredentialsProvider.create( //
-                        AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
-                                applicationConfig.getS3SecretAccessKey())));
-
-    }
-
-    @Override
-    public Flux<String> listFiles(Bucket bucket, String prefix) {
-        return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
-    }
-
-    @Override
-    public Mono<Boolean> createLock(String name) {
-        return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
-                .onErrorResume(t -> createLock(name, null));
-    }
-
-    private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
-        final long MAX_AGE_SECONDS = (long) 60 * 5;
-
-        if (head == null || head.lastModified().getEpochSecond() - Instant.now().getEpochSecond() > MAX_AGE_SECONDS) {
-
-            return this.putObject(Bucket.LOCKS, name, "") //
-                    .flatMap(resp -> Mono.just(true)) //
-                    .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
-                    .onErrorResume(t -> Mono.just(false));
-        } else {
-            return Mono.just(false);
-        }
-    }
-
-    @Override
-    public Mono<Boolean> deleteLock(String name) {
-        return deleteObject(Bucket.LOCKS, name);
-    }
-
-    @Override
-    public Mono<Boolean> deleteObject(Bucket bucket, String name) {
-
-        DeleteObjectRequest request = DeleteObjectRequest.builder() //
-                .bucket(bucket(bucket)) //
-                .key(name) //
-                .build();
-
-        CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
-
-        return Mono.fromFuture(future).map(resp -> true);
-    }
-
-    @Override
-    public Mono<byte[]> readFile(Bucket bucket, String fileName) {
-        return getDataFromS3Object(bucket(bucket), fileName);
-    }
-
-    @Override
-    public Mono<byte[]> readFile(String bucket, String fileName) {
-        return getDataFromS3Object(bucket, fileName);
-    }
-
-    public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
-        PutObjectRequest request = PutObjectRequest.builder() //
-                .bucket(bucket(bucket)) //
-                .key(fileName) //
-                .build();
-
-        AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
-
-        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
-
-        return Mono.fromFuture(future) //
-                .map(putObjectResponse -> fileName) //
-                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
-    }
-
-    public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
-        return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
-    }
-
-    public Mono<String> createS3Bucket(Bucket bucket) {
-        return createS3Bucket(bucket(bucket));
-    }
-
-    private Mono<String> createS3Bucket(String s3Bucket) {
-
-        CreateBucketRequest request = CreateBucketRequest.builder() //
-                .bucket(s3Bucket) //
-                .build();
-
-        CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
-
-        return Mono.fromFuture(future) //
-                .map(f -> s3Bucket) //
-                .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
-                .onErrorResume(t -> Mono.just(s3Bucket));
-    }
-
-    public Mono<String> deleteBucket(Bucket bucket) {
-        return listFiles(bucket, "") //
-                .flatMap(key -> deleteObject(bucket, key)) //
-                .collectList() //
-                .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
-                .map(resp -> "OK")
-                .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
-                .onErrorResume(t -> Mono.just("NOK"));
-    }
-
-    private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
-        DeleteBucketRequest request = DeleteBucketRequest.builder() //
-                .bucket(bucket(bucket)) //
-                .build();
-
-        CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
-
-        return Mono.fromFuture(future);
-    }
-
-    private String bucket(Bucket bucket) {
-        return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
-    }
-
-    private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
-
-        PutObjectRequest request = PutObjectRequest.builder() //
-                .bucket(s3Bucket) //
-                .key(s3Key) //
-                .build();
-
-        AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
-
-        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
-
-        return Mono.fromFuture(future) //
-                .map(f -> s3Key) //
-                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
-
-    }
-
-    private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
-        HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
-
-        CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
-        return Mono.fromFuture(future);
-    }
-
-    private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
-        ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
-                .bucket(bucket) //
-                .maxKeys(1100) //
-                .prefix(prefix) //
-                .build();
-        CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
-
-        return Mono.fromFuture(future) //
-                .map(ListObjectsResponse::contents) //
-                .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
-                .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
-                .flatMapMany(Flux::fromIterable) //
-                .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
-
-        ;
-    }
-
-    private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
-
-        GetObjectRequest request = GetObjectRequest.builder() //
-                .bucket(bucket) //
-                .key(key) //
-                .build();
-
-        CompletableFuture<ResponseBytes<GetObjectResponse>> future =
-                s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
-
-        return Mono.fromFuture(future) //
-                .map(b -> b.asByteArray()) //
-                .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
-                .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
-                .onErrorResume(t -> Mono.empty());
-    }
-
-}
index 373020c..3f06457 100644 (file)
@@ -38,6 +38,7 @@ public interface TopicListener {
 
         @Getter
         @Setter
+        @ToString.Exclude
         private PmReport cachedPmReport;
 
         public DataFromTopic(String key, String value) {
index 84b5685..fe305c5 100644 (file)
@@ -53,6 +53,7 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.datastore.FileStore;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReport;
 import org.oran.dmaapadapter.filter.PmReportFilter;
@@ -62,6 +63,7 @@ import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.tasks.JobDataDistributor;
+import org.oran.dmaapadapter.tasks.NewFileEvent;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
@@ -91,7 +93,7 @@ import reactor.test.StepVerifier;
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.webclient.trust-store-used=true", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
-        "app.s3.endpointOverride="})
+        "app.pm-files-path=/tmp", "app.s3.endpointOverride="})
 class ApplicationTest {
 
     @Autowired
@@ -494,11 +496,15 @@ class ApplicationTest {
 
         // Return one messagefrom DMAAP and verify that the job (consumer) receives a
         // filtered PM message
-        String path = "./src/test/resources/pm_report.json";
-        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+        String path = "./src/test/resources/pm_report.json.gz";
+        FileStore fs = new FileStore(this.applicationConfig);
+        fs.copyFileTo(Path.of(path), "pm_report.json.gz");
+
+        NewFileEvent event = NewFileEvent.builder().filename("pm_report.json.gz").build();
+
         DmaapSimulatorController.addPmResponse("{}"); // This should just be discarded
 
-        DmaapSimulatorController.addPmResponse(pmReportJson);
+        DmaapSimulatorController.addPmResponse(gson.toJson(event));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
index 37c1b7d..4e616a8 100644 (file)
@@ -56,6 +56,7 @@ import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
+import org.oran.dmaapadapter.tasks.NewFileEvent;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.slf4j.Logger;
@@ -216,8 +217,9 @@ class IntegrationWithKafka {
         kafkaReceiver2.reset();
 
         S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
-        fileStore.deleteBucket(DataStore.Bucket.FILES).block();
-        fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
+        fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
+        fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+
     }
 
     @AfterEach
@@ -230,6 +232,10 @@ class IntegrationWithKafka {
 
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
+
+        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+        fileStore.deleteBucket(DataStore.Bucket.FILES).block();
+        fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -488,19 +494,20 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 100;
+        final int NO_OF_OBJECTS = 50000;
 
         Instant startTime = Instant.now();
 
-        KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() //
-                .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) //
+        final String FILE_NAME = "pm_report.json.gz";
+
+        NewFileEvent event = NewFileEvent.builder() //
+                .filename(FILE_NAME) //
                 .build();
 
         S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
 
         fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
-        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"),
-                "pm_report.json.gz").block();
+        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
 
         String eventAsString = gson.toJson(event);