package org.oransc.ics.datastore;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.oransc.ics.configuration.ApplicationConfig;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
@Override
public Flux<String> listObjects(String prefix) {
- return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
+ return listObjectsInBucket(bucket(), prefix) //
+ .map(S3Object::key) //
.map(this::externalName);
}
}
@Override
- public Mono<String> deleteAllData() {
- return listObjects("") //
- .flatMap(key -> deleteObject(key)) //
- .collectList() //
- .map(resp -> "OK")
- .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
- .onErrorResume(t -> Mono.just("NOK"));
+ public Flux<String> deleteAllData() {
+ return listObjectsInBucket(bucket(), "") //
+ .buffer(500) //
+ .flatMap(this::deleteObjectsFromS3Storage) //
+ .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
+ .onErrorStop() //
+ .onErrorResume(t -> Flux.empty()).map(resp -> ""); //
+ }
+
+ private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Collection<S3Object> objects) {
+ Collection<ObjectIdentifier> oids = new ArrayList<>();
+ for (S3Object o : objects) {
+ ObjectIdentifier oid = ObjectIdentifier.builder() //
+ .key(o.key()) //
+ .build();
+ oids.add(oid);
+ }
+
+ Delete delete = Delete.builder() //
+ .objects(oids) //
+ .build();
+
+ DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
+ .bucket(bucket()) //
+ .delete(delete) //
+ .build();
+
+ CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
+
+ return Mono.fromFuture(future);
}
@Override
}
private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
-
- return listObjectsRequest(bucket, prefix, null) //
+ String pre = location + "/" + prefix;
+ return listObjectsRequest(bucket, pre, null) //
.expand(response -> listObjectsRequest(bucket, prefix, response)) //
.map(ListObjectsResponse::contents) //
.doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
return Mono.fromFuture(future) //
- .map(b -> b.asByteArray()) //
+ .map(BytesWrapper::asByteArray) //
.doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket,
t.getMessage())) //
.doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) //
putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
putInfoJob(TYPE_ID, "jobId1");
putInfoJob(TYPE_ID, "jobId2");
+ waitForS3();
assertThat(this.infoJobs.size()).isEqualTo(2);
{
putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
InfoType savedType = this.infoTypes.getType(TYPE_ID);
+ waitForS3();
assertThat(this.infoTypes.size()).isEqualTo(1);
{
restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
- if (this.applicationConfig.isS3Enabled()) {
- Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis
- }
+ waitForS3();
+
InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
restoredSubscriptions.restoreFromDatabase().blockLast();
assertThat(restoredSubscriptions.size()).isEqualTo(1);
assertThat(restoredSubscriptions.size()).isZero();
}
+ @SuppressWarnings("java:S2925") // sleep
+ private void waitForS3() throws InterruptedException {
+ if (this.applicationConfig.isS3Enabled()) {
+ Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis
+ }
+ }
+
@Test
void testConsumerTypeSubscription() throws Exception {