NONRTRIC - Using S3 storage for ICS 24/9324/3
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 19 Oct 2022 09:35:46 +0000 (11:35 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 19 Oct 2022 09:50:47 +0000 (11:50 +0200)
Minor changes.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-811
Change-Id: Id7ff16c1dc65381d36ecab84529192aa7959532a

pom.xml
src/main/java/org/oransc/ics/datastore/DataStore.java
src/main/java/org/oransc/ics/datastore/FileStore.java
src/main/java/org/oransc/ics/datastore/S3ObjectStore.java
src/main/java/org/oransc/ics/repository/InfoJobs.java
src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
src/main/java/org/oransc/ics/repository/InfoTypes.java
src/test/java/org/oransc/ics/ApplicationTest.java

diff --git a/pom.xml b/pom.xml
index e89058a..5fe81e2 100644 (file)
--- a/pom.xml
+++ b/pom.xml
             <artifactId>s3</artifactId>
             <version>2.17.292</version>
         </dependency>
-        <dependency>
-            <groupId>com.amazonaws</groupId>
-            <artifactId>aws-java-sdk</artifactId>
-            <version>1.12.321</version>
-        </dependency>
+     
         <!-- TEST -->
         <dependency>
             <groupId>org.springdoc</groupId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
\ No newline at end of file
+</project>
index 78d5d13..086d1f5 100644 (file)
@@ -37,7 +37,7 @@ public interface DataStore {
 
     public Mono<String> createDataStore();
 
-    public Mono<String> deleteAllData();
+    public Flux<String> deleteAllData();
 
     public Mono<String> deleteBucket();
 
index bd9c91c..3e15c98 100644 (file)
@@ -125,10 +125,9 @@ class FileStore implements DataStore {
     }
 
     @Override
-    public Mono<String> deleteAllData() {
+    public Flux<String> deleteAllData() {
         return listObjects("") //
             .flatMap(this::deleteObject) //
-            .collectList() //
             .map(o -> "OK");
     }
 
index 3434b67..8d564a6 100644 (file)
@@ -21,6 +21,8 @@
 package org.oransc.ics.datastore;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import org.oransc.ics.configuration.ApplicationConfig;
@@ -31,6 +33,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
 import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncRequestBody;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -39,14 +42,18 @@ import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Object;
@@ -84,7 +91,8 @@ class S3ObjectStore implements DataStore {
 
     @Override
     public Flux<String> listObjects(String prefix) {
-        return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
+        return listObjectsInBucket(bucket(), prefix) //
+            .map(S3Object::key) //
             .map(this::externalName);
     }
 
@@ -142,13 +150,36 @@ class S3ObjectStore implements DataStore {
     }
 
     @Override
-    public Mono<String> deleteAllData() {
-        return listObjects("") //
-            .flatMap(key -> deleteObject(key)) //
-            .collectList() //
-            .map(resp -> "OK")
-            .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
-            .onErrorResume(t -> Mono.just("NOK"));
+    public Flux<String> deleteAllData() {
+        return listObjectsInBucket(bucket(), "") //
+            .buffer(500) //
+            .flatMap(this::deleteObjectsFromS3Storage) //
+            .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
+            .onErrorStop() //
+            .onErrorResume(t -> Flux.empty()).map(resp -> ""); //
+    }
+
+    private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Collection<S3Object> objects) {
+        Collection<ObjectIdentifier> oids = new ArrayList<>();
+        for (S3Object o : objects) {
+            ObjectIdentifier oid = ObjectIdentifier.builder() //
+                .key(o.key()) //
+                .build();
+            oids.add(oid);
+        }
+
+        Delete delete = Delete.builder() //
+            .objects(oids) //
+            .build();
+
+        DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
+            .bucket(bucket()) //
+            .delete(delete) //
+            .build();
+
+        CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
+
+        return Mono.fromFuture(future);
     }
 
     @Override
@@ -193,8 +224,8 @@ class S3ObjectStore implements DataStore {
     }
 
     private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
-
-        return listObjectsRequest(bucket, prefix, null) //
+        String pre = location + "/" + prefix;
+        return listObjectsRequest(bucket, pre, null) //
             .expand(response -> listObjectsRequest(bucket, prefix, response)) //
             .map(ListObjectsResponse::contents) //
             .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
@@ -214,7 +245,7 @@ class S3ObjectStore implements DataStore {
             s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
 
         return Mono.fromFuture(future) //
-            .map(b -> b.asByteArray()) //
+            .map(BytesWrapper::asByteArray) //
             .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket,
                 t.getMessage())) //
             .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) //
index 8085573..db3b9af 100644 (file)
@@ -52,7 +52,6 @@ public class InfoJobs {
     private final Gson gson;
     private final InfoTypes infoTypes;
 
-    private final ApplicationConfig config;
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private final ProducerCallbacks producerCallbacks;
@@ -60,7 +59,6 @@ public class InfoJobs {
     private final DataStore dataStore;
 
     public InfoJobs(ApplicationConfig config, InfoTypes infoTypes, ProducerCallbacks producerCallbacks) {
-        this.config = config;
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
         this.gson = gsonBuilder.create();
@@ -165,7 +163,7 @@ public class InfoJobs {
         this.jobsByType.clear();
         jobsByOwner.clear();
 
-        dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block();
+        dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).blockLast();
     }
 
     private void doPut(InfoJob job) {
index 97fa487..1fff616 100644 (file)
@@ -58,7 +58,6 @@ public class InfoTypeSubscriptions {
     private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
     private final MultiMap<String, SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
     private final Gson gson = new GsonBuilder().create();
-    private final ApplicationConfig config;
     private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
     private final DataStore dataStore;
 
@@ -94,7 +93,6 @@ public class InfoTypeSubscriptions {
     }
 
     public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
-        this.config = config;
         this.dataStore = DataStore.create(config, "infotypesubscriptions");
         this.dataStore.createDataStore().subscribe();
     }
@@ -232,7 +230,7 @@ public class InfoTypeSubscriptions {
     }
 
     private void clearDatabase() {
-        this.dataStore.deleteAllData().block();
+        this.dataStore.deleteAllData().blockLast();
     }
 
     private void storeInFile(SubscriptionInfo subscription) {
index 98188ff..7d703e7 100644 (file)
@@ -47,12 +47,10 @@ import reactor.core.publisher.Flux;
 public class InfoTypes {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, InfoType> allInfoTypes = new HashMap<>();
-    private final ApplicationConfig config;
     private final Gson gson;
     private final DataStore dataStore;
 
     public InfoTypes(ApplicationConfig config) {
-        this.config = config;
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
         this.gson = gsonBuilder.create();
@@ -107,7 +105,7 @@ public class InfoTypes {
 
     public synchronized void clear() {
         this.allInfoTypes.clear();
-        dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block();
+        dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).blockLast();
     }
 
     public synchronized InfoType getCompatibleType(String typeId) throws ServiceException {
index 2f7c1bf..4f12c12 100644 (file)
@@ -1011,6 +1011,7 @@ class ApplicationTest {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
         putInfoJob(TYPE_ID, "jobId1");
         putInfoJob(TYPE_ID, "jobId2");
+        waitForS3();
 
         assertThat(this.infoJobs.size()).isEqualTo(2);
         {
@@ -1046,6 +1047,7 @@ class ApplicationTest {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
         InfoType savedType = this.infoTypes.getType(TYPE_ID);
 
+        waitForS3();
         assertThat(this.infoTypes.size()).isEqualTo(1);
 
         {
@@ -1078,9 +1080,8 @@ class ApplicationTest {
         restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
         assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
 
-        if (this.applicationConfig.isS3Enabled()) {
-            Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis
-        }
+        waitForS3();
+
         InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
         restoredSubscriptions.restoreFromDatabase().blockLast();
         assertThat(restoredSubscriptions.size()).isEqualTo(1);
@@ -1092,6 +1093,13 @@ class ApplicationTest {
         assertThat(restoredSubscriptions.size()).isZero();
     }
 
+    @SuppressWarnings("java:S2925") // sleep
+    private void waitForS3() throws InterruptedException {
+        if (this.applicationConfig.isS3Enabled()) {
+            Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis
+        }
+    }
+
     @Test
     void testConsumerTypeSubscription() throws Exception {