/*- * ========================LICENSE_START================================= * O-RAN-SC * %% * Copyright (C) 2021-2023 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ========================LICENSE_END=================================== */ package org.oran.datafile.datastore; import java.net.URI; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; import org.oran.datafile.configuration.AppConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.BytesWrapper; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; public class S3ObjectStore implements DataStore { private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); private final AppConfig applicationConfig; private static S3AsyncClient s3AsynchClient; public S3ObjectStore(AppConfig applicationConfig) { this.applicationConfig = applicationConfig; getS3AsynchClient(applicationConfig); } private static synchronized S3AsyncClient getS3AsynchClient(AppConfig applicationConfig) { if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); } return s3AsynchClient; } private static S3AsyncClientBuilder getS3AsyncClientBuilder(AppConfig applicationConfig) { URI uri = URI.create(applicationConfig.getS3EndpointOverride()); return S3AsyncClient.builder() // .region(Region.US_EAST_1) // .endpointOverride(uri) // .credentialsProvider(StaticCredentialsProvider.create( // AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // applicationConfig.getS3SecretAccessKey()))); } @Override public Flux listObjects(Bucket bucket, String prefix) { return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key); } @Override public Mono createLock(String name) { return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) // .onErrorResume(t -> createLock(name, null)); } private Mono createLock(String name, HeadObjectResponse head) { if (head == null) { return this.putObject(Bucket.LOCKS, name, "") // .flatMap(resp -> Mono.just(true)) // .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) // .onErrorResume(t -> Mono.just(false)); } else { return Mono.just(false); } } @Override public Mono deleteLock(String name) { return deleteObject(Bucket.LOCKS, name); } @Override public Mono deleteObject(Bucket bucket, String name) { DeleteObjectRequest request = DeleteObjectRequest.builder() // .bucket(bucket(bucket)) // .key(name) // .build(); CompletableFuture future = s3AsynchClient.deleteObject(request); return Mono.fromFuture(future).map(resp -> true); } @Override public Mono readObject(Bucket bucket, String fileName) { return getDataFromS3Object(bucket(bucket), fileName); } public Mono putObject(Bucket bucket, String fileName, String bodyString) { PutObjectRequest request = PutObjectRequest.builder() // .bucket(bucket(bucket)) // .key(fileName) // .build(); AsyncRequestBody body = AsyncRequestBody.fromString(bodyString); CompletableFuture future = s3AsynchClient.putObject(request, body); return Mono.fromFuture(future) // .map(putObjectResponse -> fileName) // .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); } @Override public Mono copyFileTo(Path fromFile, String toFile) { return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile); } public Mono fileExists(Bucket bucket, String key) { return this.getHeadObject(bucket(bucket), key).map(obj -> true) // .onErrorResume(t -> Mono.just(false)); } @Override public Mono create(Bucket bucket) { return createS3Bucket(bucket(bucket)); } private Mono createS3Bucket(String s3Bucket) { CreateBucketRequest request = CreateBucketRequest.builder() // .bucket(s3Bucket) // .build(); CompletableFuture future = s3AsynchClient.createBucket(request); return Mono.fromFuture(future) // .map(f -> s3Bucket) // .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage())) .onErrorResume(t -> Mono.just(s3Bucket)); } @Override public Mono deleteBucket(Bucket bucket) { return deleteAllFiles(bucket) // .collectList() // .flatMap(list -> deleteBucketFromS3Storage(bucket)) // .map(resp -> "OK") .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage())) .onErrorResume(t -> Mono.just("NOK")); } private Flux deleteAllFiles(Bucket bucket) { return listObjectsInBucket(bucket(bucket), "") // .buffer(500) // .flatMap(list -> deleteObjectsFromS3Storage(bucket, list)) // .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) // .onErrorStop() // .onErrorResume(t -> Flux.empty()); // } private Mono deleteObjectsFromS3Storage(Bucket bucket, Collection objects) { Collection oids = new ArrayList<>(); for (S3Object o : objects) { ObjectIdentifier oid = ObjectIdentifier.builder() // .key(o.key()) // .build(); oids.add(oid); } Delete delete = Delete.builder() // .objects(oids) // .build(); DeleteObjectsRequest request = DeleteObjectsRequest.builder() // .bucket(bucket(bucket)) // .delete(delete) // .build(); CompletableFuture future = s3AsynchClient.deleteObjects(request); return Mono.fromFuture(future); } private Mono listObjectsRequest(String bucket, String prefix, ListObjectsResponse prevResponse) { ListObjectsRequest.Builder builder = ListObjectsRequest.builder() // .bucket(bucket) // .maxKeys(1000) // .prefix(prefix); if (prevResponse != null) { if (Boolean.TRUE.equals(prevResponse.isTruncated())) { builder.marker(prevResponse.nextMarker()); } else { return Mono.empty(); } } ListObjectsRequest listObjectsRequest = builder.build(); CompletableFuture future = s3AsynchClient.listObjects(listObjectsRequest); return Mono.fromFuture(future); } private Flux listObjectsInBucket(String bucket, String prefix) { return listObjectsRequest(bucket, prefix, null) // .expand(response -> listObjectsRequest(bucket, prefix, response)) // .map(ListObjectsResponse::contents) // .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // .flatMap(Flux::fromIterable) // .doOnNext(obj -> logger.debug("Found object: {}", obj.key())); } private Mono deleteBucketFromS3Storage(Bucket bucket) { DeleteBucketRequest request = DeleteBucketRequest.builder() // .bucket(bucket(bucket)) // .build(); CompletableFuture future = s3AsynchClient.deleteBucket(request); return Mono.fromFuture(future); } private String bucket(Bucket bucket) { return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket(); } private Mono copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) { PutObjectRequest request = PutObjectRequest.builder() // .bucket(s3Bucket) // .key(s3Key) // .build(); AsyncRequestBody body = AsyncRequestBody.fromFile(fileName); CompletableFuture future = s3AsynchClient.putObject(request, body); return Mono.fromFuture(future) // .map(f -> s3Key) // .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); } private Mono getHeadObject(String bucket, String key) { HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build(); CompletableFuture future = s3AsynchClient.headObject(request); return Mono.fromFuture(future); } private Mono getDataFromS3Object(String bucket, String key) { GetObjectRequest request = GetObjectRequest.builder() // .bucket(bucket) // .key(key) // .build(); CompletableFuture> future = s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); return Mono.fromFuture(future) // .map(BytesWrapper::asByteArray) // .doOnError( t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) // .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // .onErrorResume(t -> Mono.empty()); } }