Merge "Added support for using oauth token for Kafka"
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / datastore / FileStore.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2021 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.datafile.datastore;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.nio.file.Files;
26 import java.nio.file.Path;
27 import java.nio.file.StandardCopyOption;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.stream.Stream;
31
32 import org.oran.datafile.configuration.AppConfig;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.util.FileSystemUtils;
36
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
39
40 public class FileStore implements DataStore {
41     private static final Logger logger = LoggerFactory.getLogger(java.lang.invoke.MethodHandles.lookup().lookupClass());
42
43     AppConfig appConfig;
44
45     public FileStore(AppConfig applicationConfig) {
46         this.appConfig = applicationConfig;
47     }
48
49     @Override
50     public Flux<String> listObjects(Bucket bucket, String prefix) {
51         Path root = Path.of(appConfig.getCollectedFilesPath(), prefix);
52         if (!root.toFile().exists()) {
53             root = root.getParent();
54         }
55
56         logger.debug("Listing files in: {}", root);
57
58         List<String> result = new ArrayList<>();
59         try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
60
61             stream.forEach(path -> filterListFiles(path, prefix, result));
62
63             return Flux.fromIterable(result);
64         } catch (Exception e) {
65             return Flux.error(e);
66         }
67     }
68
69     private void filterListFiles(Path path, String prefix, List<String> result) {
70         if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
71             result.add(externalName(path));
72         } else {
73             logger.debug("Ignoring file {} that does not start with: {}", path, prefix);
74         }
75     }
76
77     private String externalName(Path path) {
78         String fullName = path.toString();
79         String externalName = fullName.substring(appConfig.getCollectedFilesPath().length());
80         if (externalName.startsWith("/")) {
81             externalName = externalName.substring(1);
82         }
83         return externalName;
84     }
85
86     @Override
87     public Mono<byte[]> readObject(Bucket bucket, String fileName) {
88         try {
89             byte[] contents = Files.readAllBytes(path(fileName));
90             return Mono.just(contents);
91         } catch (Exception e) {
92             return Mono.error(e);
93         }
94     }
95
96     @Override
97     public Mono<Boolean> createLock(String name) {
98         File file = path(name).toFile();
99         try {
100             Files.createDirectories(path(name).getParent());
101             boolean res = file.createNewFile();
102             return Mono.just(res);
103         } catch (Exception e) {
104             logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage());
105             return Mono.just(!file.exists());
106         }
107     }
108
109     @Override
110     public Mono<String> copyFileTo(Path from, String to) {
111         try {
112             Path toPath = path(to);
113             Files.createDirectories(toPath);
114             Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
115             return Mono.just(to);
116         } catch (Exception e) {
117             return Mono.error(e);
118         }
119     }
120
121     @Override
122     public Mono<Boolean> deleteLock(String name) {
123         return deleteObject(Bucket.LOCKS, name);
124     }
125
126     @Override
127     public Mono<Boolean> deleteObject(Bucket bucket, String name) {
128         try {
129             Files.delete(path(name));
130             return Mono.just(true);
131         } catch (Exception e) {
132             return Mono.just(false);
133         }
134     }
135
136     @Override
137     public Mono<String> create(Bucket bucket) {
138         return Mono.just("OK");
139     }
140
141     private Path path(String name) {
142         return Path.of(appConfig.getCollectedFilesPath(), name);
143     }
144
145     public Mono<Boolean> fileExists(Bucket bucket, String key) {
146         return Mono.just(path(key).toFile().exists());
147     }
148
149     @Override
150     public Mono<String> deleteBucket(Bucket bucket) {
151         try {
152             FileSystemUtils.deleteRecursively(Path.of(appConfig.getCollectedFilesPath()));
153         } catch (IOException e) {
154             logger.debug("Could not delete directory: {}, reason; {}", appConfig.getCollectedFilesPath(),
155                 e.getMessage());
156         }
157         return Mono.just("OK");
158     }
159
160 }