Creating PM-producer
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / datastore / S3ObjectStore.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer.datastore;
22
23 import java.net.URI;
24 import java.nio.file.Path;
25 import java.util.concurrent.CompletableFuture;
26
27 import org.oran.pmproducer.configuration.ApplicationConfig;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import reactor.core.publisher.Flux;
32 import reactor.core.publisher.Mono;
33 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
34 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
35 import software.amazon.awssdk.core.BytesWrapper;
36 import software.amazon.awssdk.core.ResponseBytes;
37 import software.amazon.awssdk.core.async.AsyncRequestBody;
38 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
39 import software.amazon.awssdk.regions.Region;
40 import software.amazon.awssdk.services.s3.S3AsyncClient;
41 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
42 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
43 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
44 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
45 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
46 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
47 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
48 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
49 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
50 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
51 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
52 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
53 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
54 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
55 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
56 import software.amazon.awssdk.services.s3.model.S3Object;
57
58 class S3ObjectStore implements DataStore {
59     private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
60     private final ApplicationConfig applicationConfig;
61
62     private static S3AsyncClient s3AsynchClient;
63
64     public S3ObjectStore(ApplicationConfig applicationConfig) {
65         this.applicationConfig = applicationConfig;
66
67         getS3AsynchClient(applicationConfig);
68     }
69
70     private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
71         if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
72             s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
73         }
74         return s3AsynchClient;
75     }
76
77     private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
78         URI uri = URI.create(applicationConfig.getS3EndpointOverride());
79         return S3AsyncClient.builder() //
80                 .region(Region.US_EAST_1) //
81                 .endpointOverride(uri) //
82                 .credentialsProvider(StaticCredentialsProvider.create( //
83                         AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
84                                 applicationConfig.getS3SecretAccessKey())));
85     }
86
87     @Override
88     public Flux<String> listObjects(Bucket bucket, String prefix) {
89         return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
90     }
91
92     @Override
93     public Mono<Boolean> createLock(String name) {
94         return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
95                 .onErrorResume(t -> createLock(name, null));
96     }
97
98     private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
99         if (head == null) {
100
101             return this.putObject(Bucket.LOCKS, name, "") //
102                     .flatMap(resp -> Mono.just(true)) //
103                     .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
104                     .onErrorResume(t -> Mono.just(false));
105         } else {
106             return Mono.just(false);
107         }
108     }
109
110     @Override
111     public Mono<Boolean> deleteLock(String name) {
112         return deleteObject(Bucket.LOCKS, name);
113     }
114
115     @Override
116     public Mono<Boolean> deleteObject(Bucket bucket, String name) {
117
118         DeleteObjectRequest request = DeleteObjectRequest.builder() //
119                 .bucket(bucket(bucket)) //
120                 .key(name) //
121                 .build();
122
123         CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
124
125         return Mono.fromFuture(future).map(resp -> true);
126     }
127
128     @Override
129     public Mono<byte[]> readObject(Bucket bucket, String fileName) {
130         return getDataFromS3Object(bucket(bucket), fileName);
131     }
132
133     public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
134         PutObjectRequest request = PutObjectRequest.builder() //
135                 .bucket(bucket(bucket)) //
136                 .key(fileName) //
137                 .build();
138
139         AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
140
141         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
142
143         return Mono.fromFuture(future) //
144                 .map(putObjectResponse -> fileName) //
145                 .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
146     }
147
148     @Override
149     public Mono<String> copyFileTo(Path fromFile, String toFile) {
150         return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
151     }
152
153     @Override
154     public Mono<String> create(Bucket bucket) {
155         return createS3Bucket(bucket(bucket));
156     }
157
158     private Mono<String> createS3Bucket(String s3Bucket) {
159
160         CreateBucketRequest request = CreateBucketRequest.builder() //
161                 .bucket(s3Bucket) //
162                 .build();
163
164         CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
165
166         return Mono.fromFuture(future) //
167                 .map(f -> s3Bucket) //
168                 .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
169                 .onErrorResume(t -> Mono.just(s3Bucket));
170     }
171
172     @Override
173     public Mono<String> deleteBucket(Bucket bucket) {
174         return listObjects(bucket, "") //
175                 .flatMap(key -> deleteObject(bucket, key)) //
176                 .collectList() //
177                 .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
178                 .map(resp -> "OK")
179                 .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
180                 .onErrorResume(t -> Mono.just("NOK"));
181     }
182
183     private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
184         DeleteBucketRequest request = DeleteBucketRequest.builder() //
185                 .bucket(bucket(bucket)) //
186                 .build();
187
188         CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
189
190         return Mono.fromFuture(future);
191     }
192
193     private String bucket(Bucket bucket) {
194         return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
195     }
196
197     private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
198
199         PutObjectRequest request = PutObjectRequest.builder() //
200                 .bucket(s3Bucket) //
201                 .key(s3Key) //
202                 .build();
203
204         AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
205
206         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
207
208         return Mono.fromFuture(future) //
209                 .map(f -> s3Key) //
210                 .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
211
212     }
213
214     private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
215         HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
216
217         CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
218         return Mono.fromFuture(future);
219     }
220
221     private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
222             ListObjectsResponse prevResponse) {
223         ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
224                 .bucket(bucket) //
225                 .maxKeys(1000) //
226                 .prefix(prefix);
227
228         if (prevResponse != null) {
229             if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
230                 builder.marker(prevResponse.nextMarker());
231             } else {
232                 return Mono.empty();
233             }
234         }
235
236         ListObjectsRequest listObjectsRequest = builder.build();
237         CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
238         return Mono.fromFuture(future);
239     }
240
241     private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
242
243         return listObjectsRequest(bucket, prefix, null) //
244                 .expand(response -> listObjectsRequest(bucket, prefix, response)) //
245                 .map(ListObjectsResponse::contents) //
246                 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
247                 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
248                 .flatMap(Flux::fromIterable) //
249                 .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
250     }
251
252     private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
253
254         GetObjectRequest request = GetObjectRequest.builder() //
255                 .bucket(bucket) //
256                 .key(key) //
257                 .build();
258
259         CompletableFuture<ResponseBytes<GetObjectResponse>> future =
260                 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
261
262         return Mono.fromFuture(future) //
263                 .map(BytesWrapper::asByteArray) //
264                 .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket,
265                         t.getMessage())) //
266                 .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
267                 .onErrorResume(t -> Mono.empty());
268     }
269
270 }