2 * ========================LICENSE_START=================================
5 * Copyright (C) 2021 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.dcaegen2.collectors.datafile.datastore;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.concurrent.CompletableFuture;
29 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import reactor.core.publisher.Flux;
34 import reactor.core.publisher.Mono;
35 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
36 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
37 import software.amazon.awssdk.core.BytesWrapper;
38 import software.amazon.awssdk.core.ResponseBytes;
39 import software.amazon.awssdk.core.async.AsyncRequestBody;
40 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
41 import software.amazon.awssdk.regions.Region;
42 import software.amazon.awssdk.services.s3.S3AsyncClient;
43 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
44 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
45 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
46 import software.amazon.awssdk.services.s3.model.Delete;
47 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
48 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
49 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
50 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
51 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
52 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
53 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
54 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
55 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
56 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
57 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
58 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
59 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
60 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
61 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
62 import software.amazon.awssdk.services.s3.model.S3Object;
64 public class S3ObjectStore implements DataStore {
65 private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
66 private final AppConfig applicationConfig;
68 private static S3AsyncClient s3AsynchClient;
70 public S3ObjectStore(AppConfig applicationConfig) {
71 this.applicationConfig = applicationConfig;
73 getS3AsynchClient(applicationConfig);
76 private static synchronized S3AsyncClient getS3AsynchClient(AppConfig applicationConfig) {
77 if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
78 s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
80 return s3AsynchClient;
83 private static S3AsyncClientBuilder getS3AsyncClientBuilder(AppConfig applicationConfig) {
84 URI uri = URI.create(applicationConfig.getS3EndpointOverride());
85 return S3AsyncClient.builder() //
86 .region(Region.US_EAST_1) //
87 .endpointOverride(uri) //
88 .credentialsProvider(StaticCredentialsProvider.create( //
89 AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
90 applicationConfig.getS3SecretAccessKey())));
94 public Flux<String> listObjects(Bucket bucket, String prefix) {
95 return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
99 public Mono<Boolean> createLock(String name) {
100 return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
101 .onErrorResume(t -> createLock(name, null));
104 private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
107 return this.putObject(Bucket.LOCKS, name, "") //
108 .flatMap(resp -> Mono.just(true)) //
109 .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
110 .onErrorResume(t -> Mono.just(false));
112 return Mono.just(false);
117 public Mono<Boolean> deleteLock(String name) {
118 return deleteObject(Bucket.LOCKS, name);
122 public Mono<Boolean> deleteObject(Bucket bucket, String name) {
124 DeleteObjectRequest request = DeleteObjectRequest.builder() //
125 .bucket(bucket(bucket)) //
129 CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
131 return Mono.fromFuture(future).map(resp -> true);
135 public Mono<byte[]> readObject(Bucket bucket, String fileName) {
136 return getDataFromS3Object(bucket(bucket), fileName);
139 public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
140 PutObjectRequest request = PutObjectRequest.builder() //
141 .bucket(bucket(bucket)) //
145 AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
147 CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
149 return Mono.fromFuture(future) //
150 .map(putObjectResponse -> fileName) //
151 .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
155 public Mono<String> copyFileTo(Path fromFile, String toFile) {
156 return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
159 public Mono<Boolean> fileExists(Bucket bucket, String key) {
160 return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
161 .onErrorResume(t -> Mono.just(false));
165 public Mono<String> create(Bucket bucket) {
166 return createS3Bucket(bucket(bucket));
169 private Mono<String> createS3Bucket(String s3Bucket) {
171 CreateBucketRequest request = CreateBucketRequest.builder() //
175 CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
177 return Mono.fromFuture(future) //
178 .map(f -> s3Bucket) //
179 .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
180 .onErrorResume(t -> Mono.just(s3Bucket));
184 public Mono<String> deleteBucket(Bucket bucket) {
185 return deleteAllFiles(bucket) //
187 .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
189 .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
190 .onErrorResume(t -> Mono.just("NOK"));
193 private Flux<DeleteObjectsResponse> deleteAllFiles(Bucket bucket) {
194 return listObjectsInBucket(bucket(bucket), "") //
196 .flatMap(list -> deleteObjectsFromS3Storage(bucket, list)) //
197 .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
199 .onErrorResume(t -> Flux.empty()); //
203 private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Bucket bucket, Collection<S3Object> objects) {
204 Collection<ObjectIdentifier> oids = new ArrayList<>();
205 for (S3Object o : objects) {
206 ObjectIdentifier oid = ObjectIdentifier.builder() //
212 Delete delete = Delete.builder() //
216 DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
217 .bucket(bucket(bucket)) //
221 CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
223 return Mono.fromFuture(future);
226 private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
227 ListObjectsResponse prevResponse) {
228 ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
233 if (prevResponse != null) {
234 if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
235 builder.marker(prevResponse.nextMarker());
241 ListObjectsRequest listObjectsRequest = builder.build();
242 CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
243 return Mono.fromFuture(future);
246 private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
248 return listObjectsRequest(bucket, prefix, null) //
249 .expand(response -> listObjectsRequest(bucket, prefix, response)) //
250 .map(ListObjectsResponse::contents) //
251 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
252 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
253 .flatMap(Flux::fromIterable) //
254 .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
257 private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
258 DeleteBucketRequest request = DeleteBucketRequest.builder() //
259 .bucket(bucket(bucket)) //
262 CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
264 return Mono.fromFuture(future);
267 private String bucket(Bucket bucket) {
268 return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
271 private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
273 PutObjectRequest request = PutObjectRequest.builder() //
278 AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
280 CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
282 return Mono.fromFuture(future) //
284 .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
288 private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
289 HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
291 CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
292 return Mono.fromFuture(future);
295 private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
297 GetObjectRequest request = GetObjectRequest.builder() //
302 CompletableFuture<ResponseBytes<GetObjectResponse>> future =
303 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
305 return Mono.fromFuture(future) //
306 .map(BytesWrapper::asByteArray) //
308 t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) //
309 .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
310 .onErrorResume(t -> Mono.empty());