2 * ========================LICENSE_START=================================
5 * Copyright (C) 2022 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.ics.datastore;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.concurrent.CompletableFuture;
28 import org.oransc.ics.configuration.ApplicationConfig;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 import reactor.core.publisher.Flux;
33 import reactor.core.publisher.Mono;
34 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
35 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
36 import software.amazon.awssdk.core.BytesWrapper;
37 import software.amazon.awssdk.core.ResponseBytes;
38 import software.amazon.awssdk.core.async.AsyncRequestBody;
39 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
40 import software.amazon.awssdk.regions.Region;
41 import software.amazon.awssdk.services.s3.S3AsyncClient;
42 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
43 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
44 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
45 import software.amazon.awssdk.services.s3.model.Delete;
46 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
47 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
48 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
49 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
50 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
51 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
52 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
53 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
54 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
55 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
56 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
57 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
58 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
59 import software.amazon.awssdk.services.s3.model.S3Object;
61 class S3ObjectStore implements DataStore {
62 private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
63 private final ApplicationConfig applicationConfig;
65 private static S3AsyncClient s3AsynchClient;
66 private final String location;
68 public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
69 this.applicationConfig = applicationConfig;
70 this.location = location;
72 getS3AsynchClient(applicationConfig);
75 private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
76 if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
77 s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
79 return s3AsynchClient;
82 private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
83 URI uri = URI.create(applicationConfig.getS3EndpointOverride());
84 return S3AsyncClient.builder() //
85 .region(Region.US_EAST_1) //
86 .endpointOverride(uri) //
87 .credentialsProvider(StaticCredentialsProvider.create( //
88 AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
89 applicationConfig.getS3SecretAccessKey())));
93 public Flux<String> listObjects(String prefix) {
94 return listObjectsInBucket(bucket(), prefix) //
95 .map(S3Object::key) //
96 .map(this::externalName);
100 public Mono<Boolean> deleteObject(String name) {
101 DeleteObjectRequest request = DeleteObjectRequest.builder() //
106 CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
108 return Mono.fromFuture(future).map(resp -> true);
112 public Mono<byte[]> readObject(String fileName) {
113 return getDataFromS3Object(bucket(), fileName);
117 public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
119 PutObjectRequest request = PutObjectRequest.builder() //
121 .key(key(fileName)) //
124 AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
126 CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
128 return Mono.fromFuture(future) //
129 .map(putObjectResponse -> fileData) //
130 .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
134 public Mono<String> createDataStore() {
135 return createS3Bucket(bucket());
138 private Mono<String> createS3Bucket(String s3Bucket) {
140 CreateBucketRequest request = CreateBucketRequest.builder() //
144 CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
146 return Mono.fromFuture(future) //
147 .map(f -> s3Bucket) //
148 .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
149 .onErrorResume(t -> Mono.just(s3Bucket));
153 public Flux<String> deleteAllData() {
154 return listObjectsInBucket(bucket(), "") //
156 .flatMap(this::deleteObjectsFromS3Storage) //
157 .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
159 .onErrorResume(t -> Flux.empty()).map(resp -> ""); //
162 private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Collection<S3Object> objects) {
163 Collection<ObjectIdentifier> oids = new ArrayList<>();
164 for (S3Object o : objects) {
165 ObjectIdentifier oid = ObjectIdentifier.builder() //
171 Delete delete = Delete.builder() //
175 DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
180 CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
182 return Mono.fromFuture(future);
186 public Mono<String> deleteBucket() {
187 return deleteBucketFromS3Storage()
188 .doOnError(t -> logger.warn("Could not delete: {}, reason: {}", bucket(), t.getMessage()))
189 .map(x -> bucket()).onErrorResume(t -> Mono.just(bucket()));
192 private Mono<DeleteBucketResponse> deleteBucketFromS3Storage() {
193 DeleteBucketRequest request = DeleteBucketRequest.builder() //
197 CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
199 return Mono.fromFuture(future);
202 private String bucket() {
203 return applicationConfig.getS3Bucket();
206 private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
207 ListObjectsResponse prevResponse) {
208 ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
213 if (prevResponse != null) {
214 if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
215 builder.marker(prevResponse.nextMarker());
221 ListObjectsRequest listObjectsRequest = builder.build();
222 CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
223 return Mono.fromFuture(future);
226 private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
227 String pre = location + "/" + prefix;
228 return listObjectsRequest(bucket, pre, null) //
229 .expand(response -> listObjectsRequest(bucket, prefix, response)) //
230 .map(ListObjectsResponse::contents) //
231 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
232 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
233 .flatMap(Flux::fromIterable) //
234 .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
237 private Mono<byte[]> getDataFromS3Object(String bucket, String fileName) {
239 GetObjectRequest request = GetObjectRequest.builder() //
241 .key(key(fileName)) //
244 CompletableFuture<ResponseBytes<GetObjectResponse>> future =
245 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
247 return Mono.fromFuture(future) //
248 .map(BytesWrapper::asByteArray) //
249 .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket,
251 .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) //
252 .onErrorResume(t -> Mono.empty());
255 private String key(String fileName) {
256 return location + "/" + fileName;
259 private String externalName(String internalName) {
260 return internalName.substring(key("").length());