NONRTRIC - Using S3 storage for ICS
[nonrtric/plt/informationcoordinatorservice.git] / src / main / java / org / oransc / ics / datastore / S3ObjectStore.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2022 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.ics.datastore;
22
23 import java.net.URI;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.concurrent.CompletableFuture;
27
28 import org.oransc.ics.configuration.ApplicationConfig;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import reactor.core.publisher.Flux;
33 import reactor.core.publisher.Mono;
34 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
35 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
36 import software.amazon.awssdk.core.BytesWrapper;
37 import software.amazon.awssdk.core.ResponseBytes;
38 import software.amazon.awssdk.core.async.AsyncRequestBody;
39 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
40 import software.amazon.awssdk.regions.Region;
41 import software.amazon.awssdk.services.s3.S3AsyncClient;
42 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
43 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
44 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
45 import software.amazon.awssdk.services.s3.model.Delete;
46 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
47 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
48 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
49 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
50 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
51 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
52 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
53 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
54 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
55 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
56 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
57 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
58 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
59 import software.amazon.awssdk.services.s3.model.S3Object;
60
61 class S3ObjectStore implements DataStore {
62     private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
63     private final ApplicationConfig applicationConfig;
64
65     private static S3AsyncClient s3AsynchClient;
66     private final String location;
67
68     public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
69         this.applicationConfig = applicationConfig;
70         this.location = location;
71
72         getS3AsynchClient(applicationConfig);
73     }
74
75     private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
76         if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
77             s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
78         }
79         return s3AsynchClient;
80     }
81
82     private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
83         URI uri = URI.create(applicationConfig.getS3EndpointOverride());
84         return S3AsyncClient.builder() //
85             .region(Region.US_EAST_1) //
86             .endpointOverride(uri) //
87             .credentialsProvider(StaticCredentialsProvider.create( //
88                 AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
89                     applicationConfig.getS3SecretAccessKey())));
90     }
91
92     @Override
93     public Flux<String> listObjects(String prefix) {
94         return listObjectsInBucket(bucket(), prefix) //
95             .map(S3Object::key) //
96             .map(this::externalName);
97     }
98
99     @Override
100     public Mono<Boolean> deleteObject(String name) {
101         DeleteObjectRequest request = DeleteObjectRequest.builder() //
102             .bucket(bucket()) //
103             .key(key(name)) //
104             .build();
105
106         CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
107
108         return Mono.fromFuture(future).map(resp -> true);
109     }
110
111     @Override
112     public Mono<byte[]> readObject(String fileName) {
113         return getDataFromS3Object(bucket(), fileName);
114     }
115
116     @Override
117     public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
118
119         PutObjectRequest request = PutObjectRequest.builder() //
120             .bucket(bucket()) //
121             .key(key(fileName)) //
122             .build();
123
124         AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
125
126         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
127
128         return Mono.fromFuture(future) //
129             .map(putObjectResponse -> fileData) //
130             .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
131     }
132
133     @Override
134     public Mono<String> createDataStore() {
135         return createS3Bucket(bucket());
136     }
137
138     private Mono<String> createS3Bucket(String s3Bucket) {
139
140         CreateBucketRequest request = CreateBucketRequest.builder() //
141             .bucket(s3Bucket) //
142             .build();
143
144         CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
145
146         return Mono.fromFuture(future) //
147             .map(f -> s3Bucket) //
148             .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
149             .onErrorResume(t -> Mono.just(s3Bucket));
150     }
151
152     @Override
153     public Flux<String> deleteAllData() {
154         return listObjectsInBucket(bucket(), "") //
155             .buffer(500) //
156             .flatMap(this::deleteObjectsFromS3Storage) //
157             .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
158             .onErrorStop() //
159             .onErrorResume(t -> Flux.empty()).map(resp -> ""); //
160     }
161
162     private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Collection<S3Object> objects) {
163         Collection<ObjectIdentifier> oids = new ArrayList<>();
164         for (S3Object o : objects) {
165             ObjectIdentifier oid = ObjectIdentifier.builder() //
166                 .key(o.key()) //
167                 .build();
168             oids.add(oid);
169         }
170
171         Delete delete = Delete.builder() //
172             .objects(oids) //
173             .build();
174
175         DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
176             .bucket(bucket()) //
177             .delete(delete) //
178             .build();
179
180         CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
181
182         return Mono.fromFuture(future);
183     }
184
185     @Override
186     public Mono<String> deleteBucket() {
187         return deleteBucketFromS3Storage()
188             .doOnError(t -> logger.warn("Could not delete: {}, reason: {}", bucket(), t.getMessage()))
189             .map(x -> bucket()).onErrorResume(t -> Mono.just(bucket()));
190     }
191
192     private Mono<DeleteBucketResponse> deleteBucketFromS3Storage() {
193         DeleteBucketRequest request = DeleteBucketRequest.builder() //
194             .bucket(bucket()) //
195             .build();
196
197         CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
198
199         return Mono.fromFuture(future);
200     }
201
202     private String bucket() {
203         return applicationConfig.getS3Bucket();
204     }
205
206     private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
207         ListObjectsResponse prevResponse) {
208         ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
209             .bucket(bucket) //
210             .maxKeys(1000) //
211             .prefix(prefix);
212
213         if (prevResponse != null) {
214             if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
215                 builder.marker(prevResponse.nextMarker());
216             } else {
217                 return Mono.empty();
218             }
219         }
220
221         ListObjectsRequest listObjectsRequest = builder.build();
222         CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
223         return Mono.fromFuture(future);
224     }
225
226     private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
227         String pre = location + "/" + prefix;
228         return listObjectsRequest(bucket, pre, null) //
229             .expand(response -> listObjectsRequest(bucket, prefix, response)) //
230             .map(ListObjectsResponse::contents) //
231             .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
232             .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
233             .flatMap(Flux::fromIterable) //
234             .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
235     }
236
237     private Mono<byte[]> getDataFromS3Object(String bucket, String fileName) {
238
239         GetObjectRequest request = GetObjectRequest.builder() //
240             .bucket(bucket) //
241             .key(key(fileName)) //
242             .build();
243
244         CompletableFuture<ResponseBytes<GetObjectResponse>> future =
245             s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
246
247         return Mono.fromFuture(future) //
248             .map(BytesWrapper::asByteArray) //
249             .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket,
250                 t.getMessage())) //
251             .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) //
252             .onErrorResume(t -> Mono.empty());
253     }
254
255     private String key(String fileName) {
256         return location + "/" + fileName;
257     }
258
259     private String externalName(String internalName) {
260         return internalName.substring(key("").length());
261     }
262
263 }