Improve Test coverage of DFC
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / datastore / S3ObjectStore.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2021-2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.datafile.datastore;
22
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.concurrent.CompletableFuture;
28
29 import org.oran.datafile.configuration.AppConfig;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import reactor.core.publisher.Flux;
34 import reactor.core.publisher.Mono;
35 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
36 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
37 import software.amazon.awssdk.core.BytesWrapper;
38 import software.amazon.awssdk.core.ResponseBytes;
39 import software.amazon.awssdk.core.async.AsyncRequestBody;
40 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
41 import software.amazon.awssdk.regions.Region;
42 import software.amazon.awssdk.services.s3.S3AsyncClient;
43 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
44 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
45 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
46 import software.amazon.awssdk.services.s3.model.Delete;
47 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
48 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
49 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
50 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
51 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
52 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
53 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
54 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
55 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
56 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
57 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
58 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
59 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
60 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
61 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
62 import software.amazon.awssdk.services.s3.model.S3Object;
63
64 public class S3ObjectStore implements DataStore {
65     private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
66     private final AppConfig applicationConfig;
67
68     private static S3AsyncClient s3AsynchClient;
69
70     public S3ObjectStore(AppConfig applicationConfig) {
71         this.applicationConfig = applicationConfig;
72
73         getS3AsynchClient(applicationConfig);
74     }
75
76     @SuppressWarnings({"java:S3010", "java:S2209"})
77     public S3ObjectStore(AppConfig applicationConfig, S3AsyncClient s3AsynchClient) {
78         this.applicationConfig = applicationConfig;
79         this.s3AsynchClient = s3AsynchClient;
80     }
81
82     private static synchronized S3AsyncClient getS3AsynchClient(AppConfig applicationConfig) {
83         if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
84             s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
85         }
86         return s3AsynchClient;
87     }
88
89     private static S3AsyncClientBuilder getS3AsyncClientBuilder(AppConfig applicationConfig) {
90         URI uri = URI.create(applicationConfig.getS3EndpointOverride());
91         return S3AsyncClient.builder() //
92             .region(Region.US_EAST_1) //
93             .endpointOverride(uri) //
94             .credentialsProvider(StaticCredentialsProvider.create( //
95                 AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
96                     applicationConfig.getS3SecretAccessKey())));
97     }
98
99     @Override
100     public Flux<String> listObjects(Bucket bucket, String prefix) {
101         return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
102     }
103
104     @Override
105     public Mono<Boolean> createLock(String name) {
106         return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
107             .onErrorResume(t -> createLock(name, null));
108     }
109
110     private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
111         if (head == null) {
112
113             return this.putObject(Bucket.LOCKS, name, "") //
114                 .flatMap(resp -> Mono.just(true)) //
115                 .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
116                 .onErrorResume(t -> Mono.just(false));
117         } else {
118             return Mono.just(false);
119         }
120     }
121
122     @Override
123     public Mono<Boolean> deleteLock(String name) {
124         return deleteObject(Bucket.LOCKS, name);
125     }
126
127     @Override
128     public Mono<Boolean> deleteObject(Bucket bucket, String name) {
129
130         DeleteObjectRequest request = DeleteObjectRequest.builder() //
131             .bucket(bucket(bucket)) //
132             .key(name) //
133             .build();
134
135         CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
136
137         return Mono.fromFuture(future).map(resp -> true);
138     }
139
140     @Override
141     public Mono<byte[]> readObject(Bucket bucket, String fileName) {
142         return getDataFromS3Object(bucket(bucket), fileName);
143     }
144
145     public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
146         PutObjectRequest request = PutObjectRequest.builder() //
147             .bucket(bucket(bucket)) //
148             .key(fileName) //
149             .build();
150
151         AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
152
153         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
154
155         return Mono.fromFuture(future) //
156             .map(putObjectResponse -> fileName) //
157             .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
158     }
159
160     @Override
161     public Mono<String> copyFileTo(Path fromFile, String toFile) {
162         return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
163     }
164
165     public Mono<Boolean> fileExists(Bucket bucket, String key) {
166         return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
167             .onErrorResume(t -> Mono.just(false));
168     }
169
170     @Override
171     public Mono<String> create(Bucket bucket) {
172         return createS3Bucket(bucket(bucket));
173     }
174
175     private Mono<String> createS3Bucket(String s3Bucket) {
176
177         CreateBucketRequest request = CreateBucketRequest.builder() //
178             .bucket(s3Bucket) //
179             .build();
180
181         CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
182
183         return Mono.fromFuture(future) //
184             .map(f -> s3Bucket) //
185             .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
186             .onErrorResume(t -> Mono.just(s3Bucket));
187     }
188
189     @Override
190     public Mono<String> deleteBucket(Bucket bucket) {
191         return deleteAllFiles(bucket) //
192             .collectList() //
193             .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
194             .map(resp -> "OK")
195             .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
196             .onErrorResume(t -> Mono.just("NOK"));
197     }
198
199     private Flux<DeleteObjectsResponse> deleteAllFiles(Bucket bucket) {
200         return listObjectsInBucket(bucket(bucket), "") //
201             .buffer(500) //
202             .flatMap(list -> deleteObjectsFromS3Storage(bucket, list)) //
203             .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
204             .onErrorStop() //
205             .onErrorResume(t -> Flux.empty()); //
206
207     }
208
209     private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Bucket bucket, Collection<S3Object> objects) {
210         Collection<ObjectIdentifier> oids = new ArrayList<>();
211         for (S3Object o : objects) {
212             ObjectIdentifier oid = ObjectIdentifier.builder() //
213                 .key(o.key()) //
214                 .build();
215             oids.add(oid);
216         }
217
218         DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
219             .bucket(bucket(bucket)) //
220             .delete(Delete.builder().objects(oids).build()) //NOSONAR
221             .build();
222
223         CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
224
225         return Mono.fromFuture(future);
226     }
227
228     private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
229         ListObjectsResponse prevResponse) {
230         ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
231             .bucket(bucket) //
232             .maxKeys(1000) //
233             .prefix(prefix);
234
235         if (prevResponse != null) {
236             if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
237                 builder.marker(prevResponse.nextMarker());
238             } else {
239                 return Mono.empty();
240             }
241         }
242
243         ListObjectsRequest listObjectsRequest = builder.build();
244         CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
245         return Mono.fromFuture(future);
246     }
247
248     private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
249
250         return listObjectsRequest(bucket, prefix, null) //
251             .expand(response -> listObjectsRequest(bucket, prefix, response)) //
252             .map(ListObjectsResponse::contents) //
253             .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
254             .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
255             .flatMap(Flux::fromIterable) //
256             .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
257     }
258
259     private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
260         DeleteBucketRequest request = DeleteBucketRequest.builder() //
261             .bucket(bucket(bucket)) //
262             .build();
263
264         CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
265
266         return Mono.fromFuture(future);
267     }
268
269     private String bucket(Bucket bucket) {
270         return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
271     }
272
273     private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
274
275         PutObjectRequest request = PutObjectRequest.builder() //
276             .bucket(s3Bucket) //
277             .key(s3Key) //
278             .build();
279
280         AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
281
282         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
283
284         return Mono.fromFuture(future) //
285             .map(f -> s3Key) //
286             .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
287
288     }
289
290     private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
291         HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
292
293         CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
294         return Mono.fromFuture(future);
295     }
296
297     private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
298
299         GetObjectRequest request = GetObjectRequest.builder() //
300             .bucket(bucket) //
301             .key(key) //
302             .build();
303
304         CompletableFuture<ResponseBytes<GetObjectResponse>> future =
305             s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
306
307         return Mono.fromFuture(future) //
308             .map(BytesWrapper::asByteArray) //
309             .doOnError(
310                 t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) //
311             .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
312             .onErrorResume(t -> Mono.empty());
313     }
314
315 }