Added a feature to query stored PM data 04/9104/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 23 Sep 2022 13:06:40 +0000 (15:06 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 23 Sep 2022 13:09:01 +0000 (15:09 +0200)
Change-Id: I6ac0f2e6a4a6db0cea8ffd36583c330a1dc0babb
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773

14 files changed:
config/application.yaml
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/DataStore.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/FileStore.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 86e5987..74dc2be 100644 (file)
@@ -85,3 +85,5 @@ app:
     endpointOverride: http://localhost:9000
     accessKeyId: minio
     secretAccessKey: miniostorage
+    locksBucket: ropfilelocks
+    bucket: ropfiles
index 6beee21..097fd32 100644 (file)
@@ -116,6 +116,14 @@ public class ApplicationConfig {
     @Value("${app.s3.secretAccessKey:}")
     private String s3SecretAccessKey;
 
+    @Getter
+    @Value("${app.s3.locksBucket:}")
+    private String s3LocksBucket;
+
+    @Getter
+    @Value("${app.s3.bucket:}")
+    private String s3Bucket;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
index 3740eef..e33ae68 100644 (file)
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Map;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
@@ -48,6 +49,7 @@ public class PmReportFilter implements Filter {
             .disableHtmlEscaping() //
             .create();
 
+    @Getter
     private final FilterData filterData;
 
     @Getter
@@ -57,6 +59,9 @@ public class PmReportFilter implements Filter {
         final Collection<String> measTypes = new HashSet<>();
         final Collection<String> measuredEntityDns = new ArrayList<>();
         final Collection<String> measObjClass = new HashSet<>();
+
+        @Setter
+        String pmRopStartTime;
     }
 
     private static class MeasTypesIndexed extends PmReport.MeasTypes {
index acb9136..9fd8e57 100644 (file)
@@ -184,6 +184,7 @@ public class Job {
     @Getter
     private final String lastUpdated;
 
+    @Getter
     private final Filter filter;
 
     @Getter
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java
new file mode 100644 (file)
index 0000000..75291f0
--- /dev/null
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+    public enum Bucket {
+        FILES, LOCKS
+    }
+
+    public Flux<String> listFiles(Bucket bucket, String prefix);
+
+    public Mono<String> readFile(Bucket bucket, String fileName);
+
+    public Mono<String> readFile(String bucket, String fileName);
+
+    public Mono<Boolean> createLock(String name);
+
+    public Mono<Boolean> deleteLock(String name);
+
+    public Mono<Boolean> deleteObject(Bucket bucket, String name);
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java
new file mode 100644 (file)
index 0000000..41bacd9
--- /dev/null
@@ -0,0 +1,137 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.springframework.http.HttpStatus;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class FileStore implements DataStore {
+
+    ApplicationConfig applicationConfig;
+
+    public FileStore(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
+
+    @Override
+    public Flux<String> listFiles(Bucket bucket, String prefix) {
+        Path root = Path.of(applicationConfig.getPmFilesPath(), prefix);
+        if (!root.toFile().exists()) {
+            root = root.getParent();
+        }
+
+        List<String> result = new ArrayList<>();
+        try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+            stream.forEach(path -> filterListFiles(path, prefix, result));
+
+            return Flux.fromIterable(result);
+        } catch (Exception e) {
+            return Flux.error(e);
+        }
+    }
+
+    private void filterListFiles(Path path, String prefix, List<String> result) {
+        if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+            result.add(externalName(path));
+        }
+    }
+
+    private String externalName(Path f) {
+        String fullName = f.toString();
+        return fullName.substring(applicationConfig.getPmFilesPath().length());
+    }
+
+    public Mono<String> readFile(String bucket, String fileName) {
+        return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
+    }
+
+    @Override
+    public Mono<String> readFile(Bucket bucket, String fileName) {
+        try {
+            String contents = Files.readString(path(fileName));
+            return Mono.just(contents);
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+    @Override
+    public Mono<Boolean> createLock(String name) {
+        File file = path(name).toFile();
+        try {
+            boolean res = file.createNewFile();
+            return Mono.just(res || isAged(file));
+        } catch (Exception e) {
+            return Mono.just(file.exists() && isAged(file));
+        }
+    }
+
+    public Mono<String> copyFileTo(Path from, String to) {
+        try {
+            Path toPath = path(to);
+            Files.createDirectories(toPath);
+            Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
+            return Mono.just(to);
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+    private boolean isAged(File file) {
+        final long MAX_AGE_SECONDS = (long) 60 * 5;
+        return Instant.now().getEpochSecond() - file.lastModified() > MAX_AGE_SECONDS;
+
+    }
+
+    @Override
+    public Mono<Boolean> deleteLock(String name) {
+        return deleteObject(Bucket.LOCKS, name);
+    }
+
+    @Override
+    public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+        try {
+            Files.delete(path(name));
+            return Mono.just(true);
+        } catch (Exception e) {
+            return Mono.just(false);
+        }
+    }
+
+    private Path path(String name) {
+        return Path.of(applicationConfig.getPmFilesPath(), name);
+    }
+
+}
index 9ba5131..f57c12d 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oran.dmaapadapter.tasks;
 
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
@@ -35,8 +36,8 @@ import reactor.core.publisher.Mono;
 public class HttpJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpJobDataDistributor(Job job) {
-        super(job);
+    public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+        super(job, config);
     }
 
     @Override
index bda54a4..2e98d81 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+
 import lombok.Getter;
 
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.PmReportFilter;
+import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.tasks.KafkaTopicListener.NewFileEvent;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
@@ -43,6 +57,10 @@ public abstract class JobDataDistributor {
     private final Job job;
     private Disposable subscription;
     private final ErrorStats errorStats = new ErrorStats();
+    private final ApplicationConfig applConfig;
+
+    private final DataStore fileStore;
+    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
     private class ErrorStats {
         private int consumerFaultCounter = 0;
@@ -70,12 +88,17 @@ public abstract class JobDataDistributor {
         }
     }
 
-    protected JobDataDistributor(Job job) {
+    protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
         this.job = job;
+        this.applConfig = applConfig;
+        this.fileStore = applConfig.isS3Enabled() ? new S3ObjectStore(applConfig) : new FileStore(applConfig);
     }
 
     public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         stop();
+
+        collectHistoricalData();
+
         this.errorStats.resetIrrecoverableErrors();
         this.subscription = filterAndBuffer(input, this.job) //
                 .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
@@ -85,6 +108,73 @@ public abstract class JobDataDistributor {
                         () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
     }
 
+    static class LockedException extends ServiceException {
+        public LockedException(String file) {
+            super(file, HttpStatus.NOT_FOUND);
+        }
+    }
+
+    private void collectHistoricalData() {
+        PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
+
+        if (filter != null) {
+            this.fileStore.createLock(collectHistoricalDataLockName()) //
+                    .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted)
+                            : Mono.error(new LockedException(collectHistoricalDataLockName()))) //
+                    .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames()))
+                    .flatMap(sourceName -> fileStore.listFiles(DataStore.Bucket.FILES, sourceName), 1) //
+                    .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) //
+                    .map(this::createFakeEvent) //
+                    .flatMap(event -> filterAndBuffer(event, this.job), 1) //
+                    .flatMap(this::sendToClient, 1) //
+                    .onErrorResume(this::handleCollectHistoricalDataError) //
+                    .collectList() //
+                    .flatMap(list -> fileStore.deleteLock(collectHistoricalDataLockName())) //
+                    .subscribe();
+        }
+    }
+
+    private Mono<String> handleCollectHistoricalDataError(Throwable t) {
+
+        if (t instanceof LockedException) {
+            logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
+            return Mono.empty(); // Ignore
+        } else {
+            return fileStore.deleteLock(collectHistoricalDataLockName()) //
+                    .map(bool -> "OK") //
+                    .onErrorResume(t2 -> Mono.empty());
+        }
+    }
+
+    private String collectHistoricalDataLockName() {
+        return "collectHistoricalDataLock" + this.job.getId();
+    }
+
+    private Flux<TopicListener.DataFromTopic> createFakeEvent(String fileName) {
+
+        NewFileEvent ev = new NewFileEvent(fileName, this.applConfig.getS3Bucket());
+
+        return Flux.just(new TopicListener.DataFromTopic("", gson.toJson(ev)));
+    }
+
+    private boolean filterStartTime(String startTimeStr, String fileName) {
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json
+        try {
+            String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
+            fileTimePart = fileTimePart.substring(0, 18);
+
+            DateTimeFormatter formatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+
+            OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
+            OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
+
+            return startTime.isBefore(fileStartTime);
+        } catch (Exception e) {
+            logger.warn("Time parsing exception: {}", e.getMessage());
+            return false;
+        }
+    }
+
     private void handleExceptionInStream(Throwable t) {
         logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
         stop();
@@ -106,6 +196,7 @@ public abstract class JobDataDistributor {
     private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
         Flux<Filter.FilteredData> filtered = //
                 inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+                        .flatMap(this::getDataFromFileIfNewPmFileEvent, 100) //
                         .map(job::filter) //
                         .filter(f -> !f.isEmpty()) //
                         .doOnNext(f -> job.getStatistics().filtered(f.value)); //
@@ -120,6 +211,36 @@ public abstract class JobDataDistributor {
         return filtered;
     }
 
+    private Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
+        if (this.job.getType().getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) {
+            return Mono.just(data);
+        }
+
+        try {
+            NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+            if (ev.getObjectStoreBucket() != null) {
+                if (this.applConfig.isS3Enabled()) {
+                    return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) //
+                            .map(str -> new DataFromTopic(data.key, str));
+                } else {
+                    logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
+                    return Mono.empty();
+                }
+            } else {
+                if (applConfig.getPmFilesPath().isEmpty() || ev.getFilename() == null) {
+                    logger.debug("Passing data {}", data);
+                    return Mono.just(data);
+                } else {
+                    Path path = Path.of(this.applConfig.getPmFilesPath(), ev.getFilename());
+                    String pmReportJson = Files.readString(path, Charset.defaultCharset());
+                    return Mono.just(new DataFromTopic(data.key, pmReportJson));
+                }
+            }
+        } catch (Exception e) {
+            return Mono.just(data);
+        }
+    }
+
     private String quoteNonJson(String str, Job job) {
         return job.getType().isJson() ? str : quote(str);
     }
index 8f77381..08c8167 100644 (file)
@@ -50,7 +50,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
     private final ApplicationConfig appConfig;
 
     public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
-        super(job);
+        super(job, appConfig);
         this.appConfig = appConfig;
     }
 
@@ -58,7 +58,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
     protected Mono<String> sendToClient(Filter.FilteredData data) {
         Job job = this.getJob();
 
-        logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
+        logger.trace("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
 
         SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
 
index 8647b9b..6b5a0f7 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import lombok.Builder;
 import lombok.Getter;
@@ -44,18 +37,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.ResponseBytes;
-import software.amazon.awssdk.core.async.AsyncResponseTransformer;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
-import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 
 /**
  * The class streams incoming requests from a Kafka topic and sends them further
@@ -68,13 +51,6 @@ public class KafkaTopicListener implements TopicListener {
     private final InfoType type;
     private Flux<DataFromTopic> dataFromTopic;
 
-    @Getter
-    private static S3AsyncClient s3AsynchClient;
-
-    private static Gson gson = new GsonBuilder() //
-            .disableHtmlEscaping() //
-            .create(); //
-
     @ToString
     @Builder
     public static class NewFileEvent {
@@ -85,16 +61,9 @@ public class KafkaTopicListener implements TopicListener {
         private String objectStoreBucket;
     }
 
-    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
-        this.applicationConfig = applicationConfig;
+    public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) {
+        this.applicationConfig = applConfig;
         this.type = type;
-        if (applicationConfig.isS3Enabled()) {
-            synchronized (KafkaTopicListener.class) {
-                if (s3AsynchClient == null) {
-                    s3AsynchClient = getS3AsyncClientBuilder().build();
-                }
-            }
-        }
     }
 
     @Override
@@ -111,79 +80,16 @@ public class KafkaTopicListener implements TopicListener {
         return KafkaReceiver.create(kafkaInputProperties(clientId)) //
                 .receiveAutoAck() //
                 .concatMap(consumerRecord -> consumerRecord) //
-                .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
+                .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
                         input.value())) //
                 .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
                 .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
                 .map(input -> new DataFromTopic(input.key(), input.value())) //
-                .flatMap(this::getDataFromFileIfNewPmFileEvent, 100) //
                 .publish() //
                 .autoConnect(1);
     }
 
-    private S3AsyncClientBuilder getS3AsyncClientBuilder() {
-        URI uri = URI.create(this.applicationConfig.getS3EndpointOverride());
-        return S3AsyncClient.builder() //
-                .region(Region.US_EAST_1) //
-                .endpointOverride(uri) //
-                .credentialsProvider(StaticCredentialsProvider.create( //
-                        AwsBasicCredentials.create(this.applicationConfig.getS3AccessKeyId(), //
-                                this.applicationConfig.getS3SecretAccessKey())));
-
-    }
-
-    private Mono<String> getDataFromS3Object(String bucket, String key) {
-        if (!this.applicationConfig.isS3Enabled()) {
-            logger.error("Missing S3 confinguration in application.yaml, ignoring bucket: {}, key: {}", bucket, key);
-            return Mono.empty();
-        }
-
-        GetObjectRequest request = GetObjectRequest.builder() //
-                .bucket(bucket) //
-                .key(key) //
-                .build();
-
-        CompletableFuture<ResponseBytes<GetObjectResponse>> future = s3AsynchClient.getObject(request,
-                AsyncResponseTransformer.toBytes());
-
-        return Mono.fromFuture(future) //
-                .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
-                .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
-                .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
-                .onErrorResume(t -> Mono.empty());
-    }
-
-    private Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
-        if (this.type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) {
-            return Mono.just(data);
-        }
-
-        try {
-            NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
-            if (ev.getObjectStoreBucket() != null) {
-                if (applicationConfig.isS3Enabled()) {
-                    return getDataFromS3Object(ev.getObjectStoreBucket(), ev.getFilename()) //
-                            .map(str -> new DataFromTopic(data.key, str));
-                } else {
-                    logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
-                    return Mono.empty();
-                }
-            } else {
-                if (applicationConfig.getPmFilesPath().isEmpty() || ev.filename == null) {
-                    logger.debug("Passing data {}", data);
-                    return Mono.just(data);
-                } else {
-                    Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename());
-                    String pmReportJson = Files.readString(path, Charset.defaultCharset());
-                    return Mono.just(new DataFromTopic(data.key, pmReportJson));
-                }
-            }
-        } catch (Exception e) {
-            return Mono.just(data);
-        }
-    }
-
     private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
         Map<String, Object> consumerProps = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java
new file mode 100644 (file)
index 0000000..0a2cf25
--- /dev/null
@@ -0,0 +1,262 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3ObjectStore implements DataStore {
+    private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+    private final ApplicationConfig applicationConfig;
+
+    private static S3AsyncClient s3AsynchClient;
+
+    public S3ObjectStore(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+
+        getS3AsynchClient(applicationConfig);
+    }
+
+    private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+        if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+            s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+        }
+        return s3AsynchClient;
+    }
+
+    private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+        URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+        return S3AsyncClient.builder() //
+                .region(Region.US_EAST_1) //
+                .endpointOverride(uri) //
+                .credentialsProvider(StaticCredentialsProvider.create( //
+                        AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+                                applicationConfig.getS3SecretAccessKey())));
+
+    }
+
+    @Override
+    public Flux<String> listFiles(Bucket bucket, String prefix) {
+        return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
+    }
+
+    @Override
+    public Mono<Boolean> createLock(String name) {
+        return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
+                .onErrorResume(t -> createLock(name, null));
+    }
+
+    private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
+        final long MAX_AGE_SECONDS = (long) 60 * 5;
+
+        if (head == null || head.lastModified().getEpochSecond() - Instant.now().getEpochSecond() > MAX_AGE_SECONDS) {
+
+            return this.putObject(Bucket.LOCKS, name, "") //
+                    .flatMap(resp -> Mono.just(true)) //
+                    .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
+                    .onErrorResume(t -> Mono.just(false));
+        } else {
+            return Mono.just(false);
+        }
+    }
+
+    @Override
+    public Mono<Boolean> deleteLock(String name) {
+        return deleteObject(Bucket.LOCKS, name);
+    }
+
+    @Override
+    public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+
+        DeleteObjectRequest request = DeleteObjectRequest.builder() //
+                .bucket(bucket(bucket)) //
+                .key(name) //
+                .build();
+
+        CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+        return Mono.fromFuture(future).map(resp -> true);
+    }
+
+    @Override
+    public Mono<String> readFile(Bucket bucket, String fileName) {
+        return getDataFromS3Object(bucket(bucket), fileName);
+    }
+
+    @Override
+    public Mono<String> readFile(String bucket, String fileName) {
+        return getDataFromS3Object(bucket, fileName);
+    }
+
+    public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
+        PutObjectRequest request = PutObjectRequest.builder() //
+                .bucket(bucket(bucket)) //
+                .key(fileName) //
+                .build();
+
+        AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
+
+        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+        return Mono.fromFuture(future) //
+                .map(putObjectResponse -> fileName) //
+                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+    }
+
+    public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
+        return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+    }
+
+    public Mono<String> createS3Bucket(Bucket bucket) {
+        return createS3Bucket(bucket(bucket));
+    }
+
+    private Mono<String> createS3Bucket(String s3Bucket) {
+
+        CreateBucketRequest request = CreateBucketRequest.builder() //
+                .bucket(s3Bucket) //
+                .build();
+
+        CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+        return Mono.fromFuture(future) //
+                .map(f -> s3Bucket) //
+                .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+                .onErrorResume(t -> Mono.just(s3Bucket));
+    }
+
+    public Mono<String> deleteBucket(Bucket bucket) {
+        return listFiles(bucket, "") //
+                .flatMap(key -> deleteObject(bucket, key)) //
+                .collectList() //
+                .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
+                .map(resp -> "OK")
+                .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
+                .onErrorResume(t -> Mono.just("NOK"));
+    }
+
+    private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
+        DeleteBucketRequest request = DeleteBucketRequest.builder() //
+                .bucket(bucket(bucket)) //
+                .build();
+
+        CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+        return Mono.fromFuture(future);
+    }
+
+    private String bucket(Bucket bucket) {
+        return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
+    }
+
+    private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
+
+        PutObjectRequest request = PutObjectRequest.builder() //
+                .bucket(s3Bucket) //
+                .key(s3Key) //
+                .build();
+
+        AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
+
+        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+        return Mono.fromFuture(future) //
+                .map(f -> s3Key) //
+                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+
+    }
+
+    private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
+        HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
+
+        CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
+        return Mono.fromFuture(future);
+    }
+
+    private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+        ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
+                .bucket(bucket) //
+                .maxKeys(1100) //
+                .prefix(prefix) //
+                .build();
+        CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+
+        return Mono.fromFuture(future) //
+                .map(ListObjectsResponse::contents) //
+                .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+                .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+                .flatMapMany(Flux::fromIterable) //
+                .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
+
+        ;
+    }
+
+    private Mono<String> getDataFromS3Object(String bucket, String key) {
+
+        GetObjectRequest request = GetObjectRequest.builder() //
+                .bucket(bucket) //
+                .key(key) //
+                .build();
+
+        CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+                s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+        return Mono.fromFuture(future) //
+                .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
+                .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
+                .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+                .onErrorResume(t -> Mono.empty());
+    }
+
+}
index fcc94ee..d5868e5 100644 (file)
@@ -95,7 +95,7 @@ public class TopicListeners {
 
     private JobDataDistributor createConsumer(Job job) {
         return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
-                : new HttpJobDataDistributor(job);
+                : new HttpJobDataDistributor(job, appConfig);
     }
 
     private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
index 5e774ce..7d5ab62 100644 (file)
@@ -51,6 +51,9 @@
                            "type": "string"
                         }
                      ]
+                  },
+                  "pmRopStartTime": {
+                     "type": "string"
                   }
                }
             }
@@ -69,7 +72,7 @@
          "type": "integer",
          "minimum": 1
       },
-      "kafkaOutputTopic" : {
+      "kafkaOutputTopic": {
          "type": "string"
       },
       "bufferTimeout": {
@@ -92,4 +95,4 @@
          ]
       }
    }
-}
+}
\ No newline at end of file
index ba39e88..9cf3dec 100644 (file)
@@ -32,7 +32,6 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -54,7 +53,11 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.DataStore;
+import org.oran.dmaapadapter.tasks.DataStore.Bucket;
+import org.oran.dmaapadapter.tasks.FileStore;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
+import org.oran.dmaapadapter.tasks.S3ObjectStore;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.slf4j.Logger;
@@ -70,15 +73,9 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.kafka.sender.KafkaSender;
 import reactor.kafka.sender.SenderOptions;
 import reactor.kafka.sender.SenderRecord;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 
 @SuppressWarnings("java:S3577") // Rename class
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@@ -86,7 +83,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse;
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
-        "app.pm-files-path=./src/test/resources/"}) //
+        "app.pm-files-path=./src/test/resources/", "app.s3.locksBucket=ropfilelocks", "app.s3.bucket=ropfiles"}) //
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
@@ -189,7 +186,7 @@ class IntegrationWithKafka {
         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
             this.count++;
-            logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+            logger.trace("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
         }
 
         synchronized String lastKey() {
@@ -217,6 +214,10 @@ class IntegrationWithKafka {
         }
         kafkaReceiver.reset();
         kafkaReceiver2.reset();
+
+        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+        fileStore.deleteBucket(DataStore.Bucket.FILES).block();
+        fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
     }
 
     @AfterEach
@@ -289,6 +290,7 @@ class IntegrationWithKafka {
     ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
         try {
             Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
 
@@ -361,37 +363,6 @@ class IntegrationWithKafka {
         Thread.sleep(4000);
     }
 
-    private Mono<String> copyFileToS3Bucket(Path fileName, String s3Bucket, String s3Key) {
-
-        PutObjectRequest request = PutObjectRequest.builder() //
-                .bucket(s3Bucket) //
-                .key(s3Key) //
-                .build();
-
-        AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
-
-        CompletableFuture<PutObjectResponse> future = KafkaTopicListener.getS3AsynchClient().putObject(request, body);
-
-        return Mono.fromFuture(future) //
-                .map(f -> s3Key) //
-                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()))
-                .onErrorResume(t -> Mono.empty());
-    }
-
-    private Mono<String> createS3Bucket(String s3Bucket) {
-
-        CreateBucketRequest request = CreateBucketRequest.builder() //
-                .bucket(s3Bucket) //
-                .build();
-
-        CompletableFuture<CreateBucketResponse> future = KafkaTopicListener.getS3AsynchClient().createBucket(request);
-
-        return Mono.fromFuture(future) //
-                .map(f -> s3Bucket) //
-                .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
-                .onErrorResume(t -> Mono.just(s3Bucket));
-    }
-
     @Test
     void simpleCase() throws Exception {
         final String JOB_ID = "ID";
@@ -494,7 +465,7 @@ class IntegrationWithKafka {
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
-    @Test
+    // @Test
     void kafkaCharacteristics_pmFilter_localFile() throws Exception {
         // Filter PM reports and sent to two jobs over Kafka
 
@@ -517,7 +488,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 100000;
+        final int NO_OF_OBJECTS = 100;
 
         Instant startTime = Instant.now();
 
@@ -540,6 +511,22 @@ class IntegrationWithKafka {
         printStatistics();
     }
 
+    @Test
+    void testListFiles() {
+        FileStore fileStore = new FileStore(applicationConfig);
+
+        fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
+                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+
+        List<String> files = fileStore.listFiles(Bucket.FILES, "O-DU-112").collectList().block();
+        assertThat(files).hasSize(1);
+
+        files = fileStore.listFiles(Bucket.FILES, "O-DU-1122").collectList().block();
+        assertThat(files).hasSize(1);
+
+        fileStore.deleteObject(Bucket.FILES, files.get(0));
+    }
+
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
     void kafkaCharacteristics_pmFilter_s3() throws Exception {
@@ -564,16 +551,19 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 100000;
+        final int NO_OF_OBJECTS = 100;
 
         Instant startTime = Instant.now();
 
         KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() //
-                .filename("pm_report.json").objectStoreBucket("ropfiles") //
+                .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) //
                 .build();
 
-        createS3Bucket("ropfiles").block();
-        copyFileToS3Bucket(Path.of("./src/test/resources/pm_report.json"), "ropfiles", "pm_report.json").block();
+        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+
+        fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
+        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json")
+                .block();
 
         String eventAsString = gson.toJson(event);
 
@@ -590,6 +580,36 @@ class IntegrationWithKafka {
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
 
         printStatistics();
+
+    }
+
+    @Test
+    void testHistoricalData() throws Exception {
+        // test
+        final String JOB_ID = "testHistoricalData";
+
+        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+
+        fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
+        fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+
+        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
+                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+
+        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
+                "OTHER_SOURCENAME/test.json").block();
+
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getSourceNames().add("O-DU-1122");
+        filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
+                restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
     }
 
     @Test
@@ -608,7 +628,7 @@ class IntegrationWithKafka {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
-        var dataToSend = Flux.range(1, 100).map(i -> kafkaSenderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+        var dataToSend = Flux.range(1, 100).map(i -> kafkaSenderRecord("XMessage_" + i, "", TYPE_ID)); // Message_1,
         // Message_2
         // etc.
         sendDataToKafka(dataToSend); // this should not overflow