From 65bb65d41a972dc6734b9db102ce8f4ca75fcd9d Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 17 Oct 2022 08:20:53 +0200 Subject: [PATCH] NONRTRIC - Using S3 storage for ICS Change-Id: I7498b18f414d328094ad2cf1da093e74220e4bbb Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-810 --- config/application.yaml | 6 + pom.xml | 14 +- src/main/java/org/oransc/ics/BeanFactory.java | 16 +- .../ics/configuration/ApplicationConfig.java | 20 ++ .../oransc/ics/controllers/a1e/A1eCallbacks.java | 4 +- .../java/org/oransc/ics/datastore/DataStore.java | 48 +++++ .../java/org/oransc/ics/datastore/FileStore.java | 158 ++++++++++++++ .../org/oransc/ics/datastore/S3ObjectStore.java | 232 +++++++++++++++++++++ .../java/org/oransc/ics/repository/InfoJobs.java | 113 +++++----- .../org/oransc/ics/repository/InfoProducers.java | 3 +- .../ics/repository/InfoTypeSubscriptions.java | 81 +++---- .../java/org/oransc/ics/repository/InfoTypes.java | 80 +++---- src/test/java/org/oransc/ics/ApplicationTest.java | 41 +++- 13 files changed, 622 insertions(+), 194 deletions(-) create mode 100644 src/main/java/org/oransc/ics/datastore/DataStore.java create mode 100644 src/main/java/org/oransc/ics/datastore/FileStore.java create mode 100644 src/main/java/org/oransc/ics/datastore/S3ObjectStore.java diff --git a/config/application.yaml b/config/application.yaml index fcfc672..ce6b1ef 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -67,3 +67,9 @@ app: vardata-directory: /var/information-coordinator-service # If the file name is empty, no authorization token is used auth-token-file: + # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter. + s3: + endpointOverride: http://localhost:9000 + accessKeyId: minio + secretAccessKey: miniostorage + bucket: \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7451773..e89058a 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 0.8.5 true - + org.springframework.boot spring-boot-starter-web @@ -135,6 +135,16 @@ spring-boot-configuration-processor true + + software.amazon.awssdk + s3 + 2.17.292 + + + com.amazonaws + aws-java-sdk + 1.12.321 + org.springdoc @@ -346,4 +356,4 @@ JIRA https://jira.o-ran-sc.org/ - + \ No newline at end of file diff --git a/src/main/java/org/oransc/ics/BeanFactory.java b/src/main/java/org/oransc/ics/BeanFactory.java index ac6190f..233be90 100644 --- a/src/main/java/org/oransc/ics/BeanFactory.java +++ b/src/main/java/org/oransc/ics/BeanFactory.java @@ -29,6 +29,7 @@ import org.oransc.ics.clients.SecurityContext; import org.oransc.ics.configuration.ApplicationConfig; import org.oransc.ics.controllers.r1producer.ProducerCallbacks; import org.oransc.ics.repository.InfoJobs; +import org.oransc.ics.repository.InfoTypeSubscriptions; import org.oransc.ics.repository.InfoTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,11 +70,7 @@ class BeanFactory { public InfoJobs infoJobs(SecurityContext securityContext, InfoTypes types) { if (infoJobs == null) { infoJobs = new InfoJobs(getApplicationConfig(), types, producerCallbacks(securityContext)); - try { - infoJobs.restoreJobsFromDatabase(); - } catch (Exception e) { - logger.error("Could not restore jobs from database: {}", e.getMessage()); - } + infoJobs.restoreJobsFromDatabase().subscribe(); } return infoJobs; } @@ -83,7 +80,7 @@ class BeanFactory { if (this.infoTypes == null) { infoTypes = new InfoTypes(getApplicationConfig()); try { - infoTypes.restoreTypesFromDatabase(); + infoTypes.restoreTypesFromDatabase().blockLast(); } catch (Exception e) { logger.error("Could not restore Information Types from database: {}", e.getMessage()); } @@ -91,6 +88,13 @@ class BeanFactory { return infoTypes; } + @Bean + public InfoTypeSubscriptions infoTypeSubscriptions() { + InfoTypeSubscriptions s = new InfoTypeSubscriptions(getApplicationConfig()); + s.restoreFromDatabase().subscribe(); + return s; + } + @Bean public ProducerCallbacks producerCallbacks(SecurityContext securityContext) { if (this.producerCallbacks == null) { diff --git a/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java b/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java index 50c6daa..6f48cd3 100644 --- a/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java +++ b/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java @@ -66,6 +66,22 @@ public class ApplicationConfig { @Value("${app.webclient.http.proxy-port:0}") private int httpProxyPort = 0; + @Getter + @Value("${app.s3.endpointOverride:}") + private String s3EndpointOverride; + + @Getter + @Value("${app.s3.accessKeyId:}") + private String s3AccessKeyId; + + @Getter + @Value("${app.s3.secretAccessKey:}") + private String s3SecretAccessKey; + + @Getter + @Value("${app.s3.bucket:}") + private String s3Bucket; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { @@ -93,4 +109,8 @@ public class ApplicationConfig { return this.webClientConfig; } + public boolean isS3Enabled() { + return !(s3EndpointOverride.isBlank() || s3Bucket.isBlank()); + } + } diff --git a/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java b/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java index 15394ab..82b4c37 100644 --- a/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java +++ b/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java @@ -73,9 +73,9 @@ public class A1eCallbacks { private Mono noifyStatusToJobOwner(InfoJob job, InfoProducers eiProducers) { boolean isJobEnabled = eiProducers.isJobEnabled(job); - A1eEiJobStatus status = isJobEnabled ? new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.ENABLED) + A1eEiJobStatus jobStatus = isJobEnabled ? new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.ENABLED) : new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.DISABLED); - String body = gson.toJson(status); + String body = gson.toJson(jobStatus); return this.restClient.post(job.getJobStatusUrl(), body) // .doOnNext(response -> logger.debug("Consumer notified OK {}", job.getId())) // .doOnNext(response -> job.setLastReportedStatus(isJobEnabled)) // diff --git a/src/main/java/org/oransc/ics/datastore/DataStore.java b/src/main/java/org/oransc/ics/datastore/DataStore.java new file mode 100644 index 0000000..78d5d13 --- /dev/null +++ b/src/main/java/org/oransc/ics/datastore/DataStore.java @@ -0,0 +1,48 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.ics.datastore; + +import org.oransc.ics.configuration.ApplicationConfig; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DataStore { + + public Flux listObjects(String prefix); + + public Mono readObject(String fileName); + + public Mono writeObject(String fileName, byte[] fileData); + + public Mono deleteObject(String name); + + public Mono createDataStore(); + + public Mono deleteAllData(); + + public Mono deleteBucket(); + + static DataStore create(ApplicationConfig config, String location) { + return config.isS3Enabled() ? new S3ObjectStore(config, location) : new FileStore(config, location); + } + +} diff --git a/src/main/java/org/oransc/ics/datastore/FileStore.java b/src/main/java/org/oransc/ics/datastore/FileStore.java new file mode 100644 index 0000000..bd9c91c --- /dev/null +++ b/src/main/java/org/oransc/ics/datastore/FileStore.java @@ -0,0 +1,158 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.ics.datastore; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import org.oransc.ics.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.FileSystemUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class FileStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ApplicationConfig applicationConfig; + private final String location; + + public FileStore(ApplicationConfig applicationConfig, String location) { + this.applicationConfig = applicationConfig; + this.location = location; + } + + @Override + public Flux listObjects(String prefix) { + Path root = Path.of(path().toString(), prefix); + if (!root.toFile().exists()) { + root = root.getParent(); + } + + logger.debug("Listing files in: {}", root); + + List result = new ArrayList<>(); + try (Stream stream = Files.walk(root, Integer.MAX_VALUE)) { + + stream.forEach(path -> filterListFiles(path, prefix, result)); + + return Flux.fromIterable(result); + } catch (Exception e) { + return Flux.error(e); + } + } + + private void filterListFiles(Path path, String prefix, List result) { + if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { + result.add(externalName(path)); + } else { + logger.debug("Ignoring file/directory {}, prefix: {}", path, prefix); + } + } + + private String externalName(Path path) { + String fullName = path.toString(); + String externalName = fullName.substring(path().toString().length()); + if (externalName.startsWith("/")) { + externalName = externalName.substring(1); + } + return externalName; + } + + @Override + public Mono readObject(String fileName) { + try { + byte[] contents = Files.readAllBytes(path(fileName)); + return Mono.just(contents); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono deleteObject(String name) { + try { + Files.delete(path(name)); + return Mono.just(true); + } catch (Exception e) { + return Mono.just(false); + } + } + + @Override + public Mono createDataStore() { + try { + Files.createDirectories(path()); + } catch (IOException e) { + logger.error("Could not create directory: {}, reason: {}", path(), e.getMessage()); + } + return Mono.just("OK"); + } + + private Path path(String name) { + return Path.of(path().toString(), name); + } + + private Path path() { + return Path.of(applicationConfig.getVardataDirectory(), "database", this.location); + } + + @Override + public Mono deleteAllData() { + return listObjects("") // + .flatMap(this::deleteObject) // + .collectList() // + .map(o -> "OK"); + } + + @Override + public Mono writeObject(String fileName, byte[] fileData) { + try { + File outputFile = path(fileName).toFile(); + try (FileOutputStream outputStream = new FileOutputStream(outputFile)) { + outputStream.write(fileData); + } + } catch (IOException e) { + logger.debug("Could not write file: {}, reason; {}", path(fileName), e.getMessage()); + } + return Mono.just(fileData); + } + + @Override + public Mono deleteBucket() { + try { + FileSystemUtils.deleteRecursively(path("")); + } catch (IOException e) { + logger.warn("Could not delete: {}, reason: {}", path(""), e.getMessage()); + } + return Mono.just(path("").toString()); + } + +} diff --git a/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java new file mode 100644 index 0000000..3434b67 --- /dev/null +++ b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java @@ -0,0 +1,232 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.ics.datastore; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; + +import org.oransc.ics.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +class S3ObjectStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); + private final ApplicationConfig applicationConfig; + + private static S3AsyncClient s3AsynchClient; + private final String location; + + public S3ObjectStore(ApplicationConfig applicationConfig, String location) { + this.applicationConfig = applicationConfig; + this.location = location; + + getS3AsynchClient(applicationConfig); + } + + private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) { + if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { + s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); + } + return s3AsynchClient; + } + + private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) { + URI uri = URI.create(applicationConfig.getS3EndpointOverride()); + return S3AsyncClient.builder() // + .region(Region.US_EAST_1) // + .endpointOverride(uri) // + .credentialsProvider(StaticCredentialsProvider.create( // + AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // + applicationConfig.getS3SecretAccessKey()))); + } + + @Override + public Flux listObjects(String prefix) { + return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) // + .map(this::externalName); + } + + @Override + public Mono deleteObject(String name) { + DeleteObjectRequest request = DeleteObjectRequest.builder() // + .bucket(bucket()) // + .key(key(name)) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteObject(request); + + return Mono.fromFuture(future).map(resp -> true); + } + + @Override + public Mono readObject(String fileName) { + return getDataFromS3Object(bucket(), fileName); + } + + @Override + public Mono writeObject(String fileName, byte[] fileData) { + + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(bucket()) // + .key(key(fileName)) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData); + + CompletableFuture future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(putObjectResponse -> fileData) // + .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); + } + + @Override + public Mono createDataStore() { + return createS3Bucket(bucket()); + } + + private Mono createS3Bucket(String s3Bucket) { + + CreateBucketRequest request = CreateBucketRequest.builder() // + .bucket(s3Bucket) // + .build(); + + CompletableFuture future = s3AsynchClient.createBucket(request); + + return Mono.fromFuture(future) // + .map(f -> s3Bucket) // + .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) + .onErrorResume(t -> Mono.just(s3Bucket)); + } + + @Override + public Mono deleteAllData() { + return listObjects("") // + .flatMap(key -> deleteObject(key)) // + .collectList() // + .map(resp -> "OK") + .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage())) + .onErrorResume(t -> Mono.just("NOK")); + } + + @Override + public Mono deleteBucket() { + return deleteBucketFromS3Storage() + .doOnError(t -> logger.warn("Could not delete: {}, reason: {}", bucket(), t.getMessage())) + .map(x -> bucket()).onErrorResume(t -> Mono.just(bucket())); + } + + private Mono deleteBucketFromS3Storage() { + DeleteBucketRequest request = DeleteBucketRequest.builder() // + .bucket(bucket()) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteBucket(request); + + return Mono.fromFuture(future); + } + + private String bucket() { + return applicationConfig.getS3Bucket(); + } + + private Mono listObjectsRequest(String bucket, String prefix, + ListObjectsResponse prevResponse) { + ListObjectsRequest.Builder builder = ListObjectsRequest.builder() // + .bucket(bucket) // + .maxKeys(1000) // + .prefix(prefix); + + if (prevResponse != null) { + if (Boolean.TRUE.equals(prevResponse.isTruncated())) { + builder.marker(prevResponse.nextMarker()); + } else { + return Mono.empty(); + } + } + + ListObjectsRequest listObjectsRequest = builder.build(); + CompletableFuture future = s3AsynchClient.listObjects(listObjectsRequest); + return Mono.fromFuture(future); + } + + private Flux listObjectsInBucket(String bucket, String prefix) { + + return listObjectsRequest(bucket, prefix, null) // + .expand(response -> listObjectsRequest(bucket, prefix, response)) // + .map(ListObjectsResponse::contents) // + .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // + .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // + .flatMap(Flux::fromIterable) // + .doOnNext(obj -> logger.debug("Found object: {}", obj.key())); + } + + private Mono getDataFromS3Object(String bucket, String fileName) { + + GetObjectRequest request = GetObjectRequest.builder() // + .bucket(bucket) // + .key(key(fileName)) // + .build(); + + CompletableFuture> future = + s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); + + return Mono.fromFuture(future) // + .map(b -> b.asByteArray()) // + .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket, + t.getMessage())) // + .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) // + .onErrorResume(t -> Mono.empty()); + } + + private String key(String fileName) { + return location + "/" + fileName; + } + + private String externalName(String internalName) { + return internalName.substring(key("").length()); + } + +} diff --git a/src/main/java/org/oransc/ics/repository/InfoJobs.java b/src/main/java/org/oransc/ics/repository/InfoJobs.java index 8f8e0e9..8085573 100644 --- a/src/main/java/org/oransc/ics/repository/InfoJobs.java +++ b/src/main/java/org/oransc/ics/repository/InfoJobs.java @@ -24,27 +24,22 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.TypeAdapterFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.ServiceLoader; import java.util.Vector; import org.oransc.ics.configuration.ApplicationConfig; import org.oransc.ics.controllers.r1producer.ProducerCallbacks; +import org.oransc.ics.datastore.DataStore; import org.oransc.ics.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import org.springframework.util.FileSystemUtils; +import reactor.core.publisher.Flux; /** * Dynamic representation of all existing Information Jobs. @@ -62,6 +57,8 @@ public class InfoJobs { private final ProducerCallbacks producerCallbacks; + private final DataStore dataStore; + public InfoJobs(ApplicationConfig config, InfoTypes infoTypes, ProducerCallbacks producerCallbacks) { this.config = config; GsonBuilder gsonBuilder = new GsonBuilder(); @@ -69,40 +66,47 @@ public class InfoJobs { this.gson = gsonBuilder.create(); this.producerCallbacks = producerCallbacks; this.infoTypes = infoTypes; + this.dataStore = DataStore.create(config, "infojobs"); + this.dataStore.createDataStore().subscribe(); } - public synchronized void restoreJobsFromDatabase() throws IOException { - Files.createDirectories(Paths.get(getDatabaseDirectory())); - File dbDir = new File(getDatabaseDirectory()); + public synchronized Flux restoreJobsFromDatabase() { + return dataStore.listObjects("") // + .flatMap(dataStore::readObject) // + .map(this::toPersistentData) // + .map(this::toInfoJob) // + .filter(Objects::nonNull) // + .doOnNext(this::doPut) // + .doOnError(t -> logger.error("Could not restore jobs from datastore, reason: {}", t.getMessage())); + } - for (File file : dbDir.listFiles()) { - String json = Files.readString(file.toPath()); - InfoJob.PersistentData data = gson.fromJson(json, InfoJob.PersistentData.class); - try { - InfoJob job = toInfoJob(data); - this.doPut(job); - } catch (ServiceException e) { - logger.warn("Could not restore job:{},reason: {}", data.getId(), e.getMessage()); - } - } + private InfoJob.PersistentData toPersistentData(byte[] bytes) { + String json = new String(bytes); + return gson.fromJson(json, InfoJob.PersistentData.class); } - private InfoJob toInfoJob(InfoJob.PersistentData data) throws ServiceException { - InfoType type = infoTypes.getType(data.getTypeId()); - return InfoJob.builder() // - .id(data.getId()) // - .type(type) // - .owner(data.getOwner()) // - .jobData(data.getJobData()) // - .targetUrl(data.getTargetUrl()) // - .jobStatusUrl(data.getJobStatusUrl()) // - .lastUpdated(data.getLastUpdated()) // - .build(); + private InfoJob toInfoJob(InfoJob.PersistentData data) { + InfoType type; + try { + type = infoTypes.getType(data.getTypeId()); + return InfoJob.builder() // + .id(data.getId()) // + .type(type) // + .owner(data.getOwner()) // + .jobData(data.getJobData()) // + .targetUrl(data.getTargetUrl()) // + .jobStatusUrl(data.getJobStatusUrl()) // + .lastUpdated(data.getLastUpdated()) // + .build(); + } catch (ServiceException e) { + logger.error("Error restoring info job: {}, reason: {}", data.getId(), e.getMessage()); + } + return null; } public synchronized void put(InfoJob job) { this.doPut(job); - storeJobInFile(job); + storeJob(job); } public synchronized Collection getJobs() { @@ -146,11 +150,8 @@ public class InfoJobs { jobsByType.remove(job.getType().getId(), job); jobsByOwner.remove(job.getOwner(), job); - try { - Files.delete(getPath(job)); - } catch (IOException e) { - logger.warn("Could not remove file: {}", e.getMessage()); - } + this.dataStore.deleteObject(getPath(job)).subscribe(); + this.producerCallbacks.stopInfoJob(job, infoProducers); } @@ -163,16 +164,8 @@ public class InfoJobs { this.allEiJobs.clear(); this.jobsByType.clear(); jobsByOwner.clear(); - clearDatabase(); - } - private void clearDatabase() { - try { - FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); - Files.createDirectories(Paths.get(getDatabaseDirectory())); - } catch (IOException e) { - logger.warn("Could not delete database : {}", e.getMessage()); - } + dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block(); } private void doPut(InfoJob job) { @@ -181,26 +174,16 @@ public class InfoJobs { jobsByOwner.put(job.getOwner(), job); } - private void storeJobInFile(InfoJob job) { - try { - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(job)))) { - out.print(gson.toJson(job.getPersistentData())); - } - } catch (Exception e) { - logger.warn("Could not store job: {} {}", job.getId(), e.getMessage()); - } - } - - private File getFile(InfoJob job) { - return getPath(job).toFile(); - } - - private Path getPath(InfoJob job) { - return Path.of(getDatabaseDirectory(), job.getId()); + private void storeJob(InfoJob job) { + String json = gson.toJson(job.getPersistentData()); + byte[] bytes = json.getBytes(); + this.dataStore.writeObject(this.getPath(job), bytes) // + .doOnError(t -> logger.error("Could not store job in datastore, reason: {}", t.getMessage())) // + .subscribe(); } - private String getDatabaseDirectory() { - return config.getVardataDirectory() + "/database/eijobs"; + private String getPath(InfoJob job) { + return job.getId(); } } diff --git a/src/main/java/org/oransc/ics/repository/InfoProducers.java b/src/main/java/org/oransc/ics/repository/InfoProducers.java index 4ea881f..e8e206a 100644 --- a/src/main/java/org/oransc/ics/repository/InfoProducers.java +++ b/src/main/java/org/oransc/ics/repository/InfoProducers.java @@ -135,9 +135,8 @@ public class InfoProducers { } public synchronized boolean isJobEnabled(InfoJob job) { - InfoType type; try { - type = this.infoTypes.getType(job.getType().getId()); + InfoType type = this.infoTypes.getType(job.getType().getId()); for (InfoProducer producer : this.getProducersSupportingType(type)) { if (producer.isJobEnabled(job)) { diff --git a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java index 2557f3c..97fa487 100644 --- a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java +++ b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java @@ -23,18 +23,12 @@ package org.oransc.ics.repository; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Vector; import java.util.function.Function; @@ -42,13 +36,13 @@ import lombok.Builder; import lombok.Getter; import org.oransc.ics.configuration.ApplicationConfig; +import org.oransc.ics.datastore.DataStore; import org.oransc.ics.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; -import org.springframework.util.FileSystemUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -66,6 +60,7 @@ public class InfoTypeSubscriptions { private final Gson gson = new GsonBuilder().create(); private final ApplicationConfig config; private final Map callbackHandlers = new HashMap<>(); + private final DataStore dataStore; public interface ConsumerCallbackHandler { Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); @@ -96,17 +91,12 @@ public class InfoTypeSubscriptions { } return this.id.equals(o); } - } public InfoTypeSubscriptions(@Autowired ApplicationConfig config) { this.config = config; - - try { - this.restoreFromDatabase(); - } catch (IOException e) { - logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory()); - } + this.dataStore = DataStore.create(config, "infotypesubscriptions"); + this.dataStore.createDataStore().subscribe(); } public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) { @@ -162,12 +152,7 @@ public class InfoTypeSubscriptions { public void remove(SubscriptionInfo subscription) { allSubscriptions.remove(subscription.getId()); subscriptionsByOwner.remove(subscription.owner, subscription); - - try { - Files.delete(getPath(subscription)); - } catch (Exception e) { - logger.debug("Could not delete subscription from database: {}", e.getMessage()); - } + dataStore.deleteObject(getPath(subscription)).subscribe(); logger.debug("Removed type status subscription {}", subscription.id); } @@ -247,34 +232,28 @@ public class InfoTypeSubscriptions { } private void clearDatabase() { - try { - FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); - Files.createDirectories(Paths.get(getDatabaseDirectory())); - } catch (IOException e) { - logger.warn("Could not delete database : {}", e.getMessage()); - } + this.dataStore.deleteAllData().block(); } private void storeInFile(SubscriptionInfo subscription) { - try { - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) { - String json = gson.toJson(subscription); - out.print(json); - } - } catch (Exception e) { - logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage()); - } + String json = gson.toJson(subscription); + byte[] bytes = json.getBytes(); + this.dataStore.writeObject(this.getPath(subscription), bytes) + .doOnError(t -> logger.error("Could not store infotype subscription, reason: {}", t.getMessage())) // + .subscribe(); } - public synchronized void restoreFromDatabase() throws IOException { - Files.createDirectories(Paths.get(getDatabaseDirectory())); - File dbDir = new File(getDatabaseDirectory()); + public synchronized Flux restoreFromDatabase() { + return dataStore.listObjects("") // + .flatMap(dataStore::readObject) // + .map(this::toSubscriptionInfo) // + .filter(Objects::nonNull) // + .doOnNext(this::doPut);// + } - for (File file : dbDir.listFiles()) { - String json = Files.readString(file.toPath()); - SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class); - doPut(subscription); - } + private SubscriptionInfo toSubscriptionInfo(byte[] bytes) { + String json = new String(bytes); + return gson.fromJson(json, SubscriptionInfo.class); } private void doPut(SubscriptionInfo subscription) { @@ -282,20 +261,8 @@ public class InfoTypeSubscriptions { subscriptionsByOwner.put(subscription.owner, subscription); } - private File getFile(SubscriptionInfo subscription) { - return getPath(subscription).toFile(); - } - - private Path getPath(SubscriptionInfo subscription) { - return getPath(subscription.getId()); - } - - private Path getPath(String subscriptionId) { - return Path.of(getDatabaseDirectory(), subscriptionId); - } - - private String getDatabaseDirectory() { - return config.getVardataDirectory() + "/database/infotypesubscriptions"; + private String getPath(SubscriptionInfo subscription) { + return subscription.getId(); } } diff --git a/src/main/java/org/oransc/ics/repository/InfoTypes.java b/src/main/java/org/oransc/ics/repository/InfoTypes.java index 110bca2..98188ff 100644 --- a/src/main/java/org/oransc/ics/repository/InfoTypes.java +++ b/src/main/java/org/oransc/ics/repository/InfoTypes.java @@ -24,26 +24,21 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.TypeAdapterFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.ServiceLoader; import java.util.Vector; import org.oransc.ics.configuration.ApplicationConfig; +import org.oransc.ics.datastore.DataStore; import org.oransc.ics.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import org.springframework.util.FileSystemUtils; +import reactor.core.publisher.Flux; /** * Dynamic representation of all Information Types in the system. @@ -54,24 +49,30 @@ public class InfoTypes { private final Map allInfoTypes = new HashMap<>(); private final ApplicationConfig config; private final Gson gson; + private final DataStore dataStore; public InfoTypes(ApplicationConfig config) { this.config = config; GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); this.gson = gsonBuilder.create(); + + this.dataStore = DataStore.create(config, "infotypes"); + this.dataStore.createDataStore().subscribe(); } - public synchronized void restoreTypesFromDatabase() throws IOException { - Files.createDirectories(Paths.get(getDatabaseDirectory())); - File dbDir = new File(getDatabaseDirectory()); + public synchronized Flux restoreTypesFromDatabase() { + return dataStore.listObjects("") // + .flatMap(dataStore::readObject) // + .map(this::toInfoType) // + .filter(Objects::nonNull) // + .doOnNext(type -> allInfoTypes.put(type.getId(), type)) // + .doOnError(t -> logger.error("Could not restore types from datastore, reason: {}", t.getMessage())); + } - for (File file : dbDir.listFiles()) { - String json = Files.readString(file.toPath()); - InfoType.PersistentInfo storedData = gson.fromJson(json, InfoType.PersistentInfo.class); - InfoType type = new InfoType(storedData); - allInfoTypes.put(type.getId(), type); - } + private InfoType toInfoType(byte[] bytes) { + String json = new String(bytes); + return gson.fromJson(json, InfoType.class); } public synchronized void put(InfoType type) { @@ -97,11 +98,7 @@ public class InfoTypes { public synchronized void remove(InfoType type) { allInfoTypes.remove(type.getId()); - try { - Files.delete(getPath(type)); - } catch (IOException e) { - logger.warn("Could not remove file: {} {}", type.getId(), e.getMessage()); - } + dataStore.deleteObject(getPath(type)).block(); } public synchronized int size() { @@ -110,7 +107,7 @@ public class InfoTypes { public synchronized void clear() { this.allInfoTypes.clear(); - clearDatabase(); + dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block(); } public synchronized InfoType getCompatibleType(String typeId) throws ServiceException { @@ -127,38 +124,15 @@ public class InfoTypes { return compatibleTypes.iterator().next(); } - private void clearDatabase() { - try { - FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); - Files.createDirectories(Paths.get(getDatabaseDirectory())); - } catch (IOException e) { - logger.warn("Could not delete database : {}", e.getMessage()); - } - } - private void storeInFile(InfoType type) { - try { - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) { - out.print(gson.toJson(type.getPersistentInfo())); - } - } catch (Exception e) { - logger.warn("Could not save type: {} {}", type.getId(), e.getMessage()); - } - } - - private File getFile(InfoType type) { - return getPath(type).toFile(); - } - - private Path getPath(InfoType type) { - return getPath(type.getId()); - } - - private Path getPath(String typeId) { - return Path.of(getDatabaseDirectory(), typeId); + String json = gson.toJson(type); + byte[] bytes = json.getBytes(); + this.dataStore.writeObject(this.getPath(type), bytes) + .doOnError(t -> logger.error("Could not store infotype in datastore, reason: {}", t.getMessage())) // + .subscribe(); } - private String getDatabaseDirectory() { - return config.getVardataDirectory() + "/database/eitypes"; + private String getPath(InfoType type) { + return type.getId(); } } diff --git a/src/test/java/org/oransc/ics/ApplicationTest.java b/src/test/java/org/oransc/ics/ApplicationTest.java index 77382b7..2f7c1bf 100644 --- a/src/test/java/org/oransc/ics/ApplicationTest.java +++ b/src/test/java/org/oransc/ics/ApplicationTest.java @@ -37,6 +37,7 @@ import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.List; import java.util.Map; import org.json.JSONObject; @@ -68,6 +69,7 @@ import org.oransc.ics.controllers.r1producer.ProducerInfoTypeInfo; import org.oransc.ics.controllers.r1producer.ProducerJobInfo; import org.oransc.ics.controllers.r1producer.ProducerRegistrationInfo; import org.oransc.ics.controllers.r1producer.ProducerStatusInfo; +import org.oransc.ics.datastore.DataStore; import org.oransc.ics.exceptions.ServiceException; import org.oransc.ics.repository.InfoJob; import org.oransc.ics.repository.InfoJobs; @@ -103,7 +105,9 @@ import reactor.test.StepVerifier; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.webclient.trust-store-used=true", // - "app.vardata-directory=./target"}) + "app.vardata-directory=/tmp/ics", // + "app.s3.bucket=" // If this is set, S3 will be used to store data. + }) class ApplicationTest { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -982,6 +986,26 @@ class ApplicationTest { assertThat(resp.getBody()).contains("hunky dory"); } + @Test + void testDb() { + DataStore db = DataStore.create(this.applicationConfig, "test"); + db.createDataStore().block(); + final int NO_OF_OBJS = 1200; + for (int i = 0; i < NO_OF_OBJS; ++i) { + String data = "data"; + db.writeObject("Obj_" + i, data.getBytes()).block(); + } + + List entries = db.listObjects("").collectList().block(); + assertThat(entries).hasSize(NO_OF_OBJS); + + db.listObjects("").doOnNext(name -> logger.debug("deleted {}", name)).flatMap(name -> db.deleteObject(name)) + .blockLast(); + + db.createDataStore().block(); + + } + @Test void testJobDatabasePersistence() throws Exception { putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID); @@ -989,12 +1013,11 @@ class ApplicationTest { putInfoJob(TYPE_ID, "jobId2"); assertThat(this.infoJobs.size()).isEqualTo(2); - { InfoJob savedJob = this.infoJobs.getJob("jobId1"); // Restore the jobs InfoJobs jobs = new InfoJobs(this.applicationConfig, this.infoTypes, this.producerCallbacks); - jobs.restoreJobsFromDatabase(); + jobs.restoreJobsFromDatabase().blockLast(); assertThat(jobs.size()).isEqualTo(2); InfoJob restoredJob = jobs.getJob("jobId1"); assertThat(restoredJob.getPersistentData()).isEqualTo(savedJob.getPersistentData()); @@ -1007,7 +1030,7 @@ class ApplicationTest { { // Restore the jobs, no jobs in database InfoJobs jobs = new InfoJobs(this.applicationConfig, this.infoTypes, this.producerCallbacks); - jobs.restoreJobsFromDatabase(); + jobs.restoreJobsFromDatabase().blockLast(); assertThat(jobs.size()).isZero(); } logger.warn("Test removing a job when the db file is gone"); @@ -1028,7 +1051,7 @@ class ApplicationTest { { // Restore the types InfoTypes restoredTypes = new InfoTypes(this.applicationConfig); - restoredTypes.restoreTypesFromDatabase(); + restoredTypes.restoreTypesFromDatabase().blockLast(); InfoType restoredType = restoredTypes.getType(TYPE_ID); assertThat(restoredType.getPersistentInfo()).isEqualTo(savedType.getPersistentInfo()); assertThat(restoredTypes.size()).isEqualTo(1); @@ -1037,7 +1060,7 @@ class ApplicationTest { // Restore the jobs, no jobs in database InfoTypes restoredTypes = new InfoTypes(this.applicationConfig); restoredTypes.clear(); - restoredTypes.restoreTypesFromDatabase(); + restoredTypes.restoreTypesFromDatabase().blockLast(); assertThat(restoredTypes.size()).isZero(); } logger.warn("Test removing a job when the db file is gone"); @@ -1046,7 +1069,7 @@ class ApplicationTest { } @Test - void testConsumerTypeSubscriptionDatabase() { + void testConsumerTypeSubscriptionDatabase() throws Exception { final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl(); final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner"); @@ -1055,7 +1078,11 @@ class ApplicationTest { restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block(); assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1); + if (this.applicationConfig.isS3Enabled()) { + Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis + } InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig); + restoredSubscriptions.restoreFromDatabase().blockLast(); assertThat(restoredSubscriptions.size()).isEqualTo(1); assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1); -- 2.16.6