From: PatrikBuhr Date: Wed, 19 Oct 2022 09:35:46 +0000 (+0200) Subject: NONRTRIC - Using S3 storage for ICS X-Git-Tag: 1.4.0~3 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F24%2F9324%2F3;p=nonrtric%2Fplt%2Finformationcoordinatorservice.git NONRTRIC - Using S3 storage for ICS Minor changes. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-811 Change-Id: Id7ff16c1dc65381d36ecab84529192aa7959532a --- diff --git a/pom.xml b/pom.xml index e89058a..5fe81e2 100644 --- a/pom.xml +++ b/pom.xml @@ -140,11 +140,7 @@ s3 2.17.292 - - com.amazonaws - aws-java-sdk - 1.12.321 - + org.springdoc @@ -356,4 +352,4 @@ JIRA https://jira.o-ran-sc.org/ - \ No newline at end of file + diff --git a/src/main/java/org/oransc/ics/datastore/DataStore.java b/src/main/java/org/oransc/ics/datastore/DataStore.java index 78d5d13..086d1f5 100644 --- a/src/main/java/org/oransc/ics/datastore/DataStore.java +++ b/src/main/java/org/oransc/ics/datastore/DataStore.java @@ -37,7 +37,7 @@ public interface DataStore { public Mono createDataStore(); - public Mono deleteAllData(); + public Flux deleteAllData(); public Mono deleteBucket(); diff --git a/src/main/java/org/oransc/ics/datastore/FileStore.java b/src/main/java/org/oransc/ics/datastore/FileStore.java index bd9c91c..3e15c98 100644 --- a/src/main/java/org/oransc/ics/datastore/FileStore.java +++ b/src/main/java/org/oransc/ics/datastore/FileStore.java @@ -125,10 +125,9 @@ class FileStore implements DataStore { } @Override - public Mono deleteAllData() { + public Flux deleteAllData() { return listObjects("") // .flatMap(this::deleteObject) // - .collectList() // .map(o -> "OK"); } diff --git a/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java index 3434b67..8d564a6 100644 --- a/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java +++ b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java @@ -21,6 +21,8 @@ package org.oransc.ics.datastore; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import org.oransc.ics.configuration.ApplicationConfig; @@ -31,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.BytesWrapper; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; @@ -39,14 +42,18 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; @@ -84,7 +91,8 @@ class S3ObjectStore implements DataStore { @Override public Flux listObjects(String prefix) { - return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) // + return listObjectsInBucket(bucket(), prefix) // + .map(S3Object::key) // .map(this::externalName); } @@ -142,13 +150,36 @@ class S3ObjectStore implements DataStore { } @Override - public Mono deleteAllData() { - return listObjects("") // - .flatMap(key -> deleteObject(key)) // - .collectList() // - .map(resp -> "OK") - .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage())) - .onErrorResume(t -> Mono.just("NOK")); + public Flux deleteAllData() { + return listObjectsInBucket(bucket(), "") // + .buffer(500) // + .flatMap(this::deleteObjectsFromS3Storage) // + .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) // + .onErrorStop() // + .onErrorResume(t -> Flux.empty()).map(resp -> ""); // + } + + private Mono deleteObjectsFromS3Storage(Collection objects) { + Collection oids = new ArrayList<>(); + for (S3Object o : objects) { + ObjectIdentifier oid = ObjectIdentifier.builder() // + .key(o.key()) // + .build(); + oids.add(oid); + } + + Delete delete = Delete.builder() // + .objects(oids) // + .build(); + + DeleteObjectsRequest request = DeleteObjectsRequest.builder() // + .bucket(bucket()) // + .delete(delete) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteObjects(request); + + return Mono.fromFuture(future); } @Override @@ -193,8 +224,8 @@ class S3ObjectStore implements DataStore { } private Flux listObjectsInBucket(String bucket, String prefix) { - - return listObjectsRequest(bucket, prefix, null) // + String pre = location + "/" + prefix; + return listObjectsRequest(bucket, pre, null) // .expand(response -> listObjectsRequest(bucket, prefix, response)) // .map(ListObjectsResponse::contents) // .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // @@ -214,7 +245,7 @@ class S3ObjectStore implements DataStore { s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); return Mono.fromFuture(future) // - .map(b -> b.asByteArray()) // + .map(BytesWrapper::asByteArray) // .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket, t.getMessage())) // .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) // diff --git a/src/main/java/org/oransc/ics/repository/InfoJobs.java b/src/main/java/org/oransc/ics/repository/InfoJobs.java index 8085573..db3b9af 100644 --- a/src/main/java/org/oransc/ics/repository/InfoJobs.java +++ b/src/main/java/org/oransc/ics/repository/InfoJobs.java @@ -52,7 +52,6 @@ public class InfoJobs { private final Gson gson; private final InfoTypes infoTypes; - private final ApplicationConfig config; private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final ProducerCallbacks producerCallbacks; @@ -60,7 +59,6 @@ public class InfoJobs { private final DataStore dataStore; public InfoJobs(ApplicationConfig config, InfoTypes infoTypes, ProducerCallbacks producerCallbacks) { - this.config = config; GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); this.gson = gsonBuilder.create(); @@ -165,7 +163,7 @@ public class InfoJobs { this.jobsByType.clear(); jobsByOwner.clear(); - dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block(); + dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).blockLast(); } private void doPut(InfoJob job) { diff --git a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java index 97fa487..1fff616 100644 --- a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java +++ b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java @@ -58,7 +58,6 @@ public class InfoTypeSubscriptions { private final Map allSubscriptions = new HashMap<>(); private final MultiMap subscriptionsByOwner = new MultiMap<>(); private final Gson gson = new GsonBuilder().create(); - private final ApplicationConfig config; private final Map callbackHandlers = new HashMap<>(); private final DataStore dataStore; @@ -94,7 +93,6 @@ public class InfoTypeSubscriptions { } public InfoTypeSubscriptions(@Autowired ApplicationConfig config) { - this.config = config; this.dataStore = DataStore.create(config, "infotypesubscriptions"); this.dataStore.createDataStore().subscribe(); } @@ -232,7 +230,7 @@ public class InfoTypeSubscriptions { } private void clearDatabase() { - this.dataStore.deleteAllData().block(); + this.dataStore.deleteAllData().blockLast(); } private void storeInFile(SubscriptionInfo subscription) { diff --git a/src/main/java/org/oransc/ics/repository/InfoTypes.java b/src/main/java/org/oransc/ics/repository/InfoTypes.java index 98188ff..7d703e7 100644 --- a/src/main/java/org/oransc/ics/repository/InfoTypes.java +++ b/src/main/java/org/oransc/ics/repository/InfoTypes.java @@ -47,12 +47,10 @@ import reactor.core.publisher.Flux; public class InfoTypes { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map allInfoTypes = new HashMap<>(); - private final ApplicationConfig config; private final Gson gson; private final DataStore dataStore; public InfoTypes(ApplicationConfig config) { - this.config = config; GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); this.gson = gsonBuilder.create(); @@ -107,7 +105,7 @@ public class InfoTypes { public synchronized void clear() { this.allInfoTypes.clear(); - dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block(); + dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).blockLast(); } public synchronized InfoType getCompatibleType(String typeId) throws ServiceException { diff --git a/src/test/java/org/oransc/ics/ApplicationTest.java b/src/test/java/org/oransc/ics/ApplicationTest.java index 2f7c1bf..4f12c12 100644 --- a/src/test/java/org/oransc/ics/ApplicationTest.java +++ b/src/test/java/org/oransc/ics/ApplicationTest.java @@ -1011,6 +1011,7 @@ class ApplicationTest { putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID); putInfoJob(TYPE_ID, "jobId1"); putInfoJob(TYPE_ID, "jobId2"); + waitForS3(); assertThat(this.infoJobs.size()).isEqualTo(2); { @@ -1046,6 +1047,7 @@ class ApplicationTest { putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID); InfoType savedType = this.infoTypes.getType(TYPE_ID); + waitForS3(); assertThat(this.infoTypes.size()).isEqualTo(1); { @@ -1078,9 +1080,8 @@ class ApplicationTest { restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block(); assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1); - if (this.applicationConfig.isS3Enabled()) { - Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis - } + waitForS3(); + InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig); restoredSubscriptions.restoreFromDatabase().blockLast(); assertThat(restoredSubscriptions.size()).isEqualTo(1); @@ -1092,6 +1093,13 @@ class ApplicationTest { assertThat(restoredSubscriptions.size()).isZero(); } + @SuppressWarnings("java:S2925") // sleep + private void waitForS3() throws InterruptedException { + if (this.applicationConfig.isS3Enabled()) { + Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis + } + } + @Test void testConsumerTypeSubscription() throws Exception {