Improved traces. 27/9227/2
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 10 Oct 2022 08:58:11 +0000 (10:58 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 10 Oct 2022 13:35:21 +0000 (15:35 +0200)
Added a testcase for readding historical PM data from file system.
Fixed one bug.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ic48dfd8177fb25ed49f6179d270f238988ece3fd

src/main/java/org/oran/dmaapadapter/datastore/FileStore.java
src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java

index 3d4cb4d..430b62f 100644 (file)
@@ -21,6 +21,8 @@
 package org.oran.dmaapadapter.datastore;
 
 import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -29,11 +31,15 @@ import java.util.List;
 import java.util.stream.Stream;
 
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class FileStore implements DataStore {
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     ApplicationConfig applicationConfig;
 
@@ -48,6 +54,8 @@ public class FileStore implements DataStore {
             root = root.getParent();
         }
 
+        logger.debug("Listing files in: {}", root);
+
         List<String> result = new ArrayList<>();
         try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
 
@@ -62,12 +70,18 @@ public class FileStore implements DataStore {
     private void filterListFiles(Path path, String prefix, List<String> result) {
         if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
             result.add(externalName(path));
+        } else {
+            logger.debug("Ignoring file {} that does not start with: {}", path, prefix);
         }
     }
 
-    private String externalName(Path f) {
-        String fullName = f.toString();
-        return fullName.substring(applicationConfig.getPmFilesPath().length());
+    private String externalName(Path path) {
+        String fullName = path.toString();
+        String externalName = fullName.substring(applicationConfig.getPmFilesPath().length());
+        if (externalName.startsWith("/")) {
+            externalName = externalName.substring(1);
+        }
+        return externalName;
     }
 
     @Override
@@ -121,4 +135,13 @@ public class FileStore implements DataStore {
         return Path.of(applicationConfig.getPmFilesPath(), name);
     }
 
+    public void deleteFiles() {
+        try {
+            FileSystemUtils.deleteRecursively(Path.of(applicationConfig.getPmFilesPath()));
+        } catch (IOException e) {
+            logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.getPmFilesPath(),
+                    e.getMessage());
+        }
+    }
+
 }
index de9da16..fdbd3e4 100644 (file)
@@ -81,7 +81,6 @@ public class S3ObjectStore implements DataStore {
                 .credentialsProvider(StaticCredentialsProvider.create( //
                         AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
                                 applicationConfig.getS3SecretAccessKey())));
-
     }
 
     @Override
index 6d2327d..da28e64 100644 (file)
@@ -95,6 +95,12 @@ public abstract class JobDataDistributor {
         this.job = job;
         this.applConfig = applConfig;
         this.fileStore = applConfig.isS3Enabled() ? new S3ObjectStore(applConfig) : new FileStore(applConfig);
+
+        if (applConfig.isS3Enabled()) {
+            S3ObjectStore fs = new S3ObjectStore(applConfig);
+            fs.createS3Bucket(DataStore.Bucket.FILES).subscribe();
+            fs.createS3Bucket(DataStore.Bucket.LOCKS).subscribe();
+        }
     }
 
     public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
@@ -120,9 +126,14 @@ public abstract class JobDataDistributor {
 
         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
             this.fileStore.createLock(collectHistoricalDataLockName()) //
-                    .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted)
+                    .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted)
                             : Mono.error(new LockedException(collectHistoricalDataLockName()))) //
-                    .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames()))
+                    .doOnNext(n -> logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId())) //
+                    .doOnError(t -> logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
+                            this.job.getId())) //
+                    .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
+                    .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
+                            this.job.getId())) //
                     .flatMap(sourceName -> fileStore.listFiles(DataStore.Bucket.FILES, sourceName), 1) //
                     .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) //
                     .map(this::createFakeEvent) //
@@ -134,14 +145,12 @@ public abstract class JobDataDistributor {
     }
 
     private Mono<String> handleCollectHistoricalDataError(Throwable t) {
-
         if (t instanceof LockedException) {
             logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
             return Mono.empty(); // Ignore
         } else {
             return tryDeleteLockFile() //
                     .map(bool -> "OK");
-
         }
     }
 
@@ -159,15 +168,24 @@ public abstract class JobDataDistributor {
     private boolean filterStartTime(String startTimeStr, String fileName) {
         // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json
         try {
-            String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
-            fileTimePart = fileTimePart.substring(0, 18);
+            if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) {
 
-            DateTimeFormatter formatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+                String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
+                fileTimePart = fileTimePart.substring(0, 18);
 
-            OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
-            OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
+                DateTimeFormatter formatter =
+                        new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+
+                OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
+                OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
+                boolean isBefore = startTime.isBefore(fileStartTime);
+                logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isBefore: {}", fileName,
+                        fileStartTime, startTime, isBefore);
+                return isBefore;
+            } else {
+                return false;
+            }
 
-            return startTime.isBefore(fileStartTime);
         } catch (Exception e) {
             logger.warn("Time parsing exception: {}", e.getMessage());
             return false;
index fe305c5..393cd27 100644 (file)
@@ -93,7 +93,8 @@ import reactor.test.StepVerifier;
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.webclient.trust-store-used=true", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
-        "app.pm-files-path=/tmp", "app.s3.endpointOverride="})
+        "app.pm-files-path=/tmp/dmaapadaptor", //
+        "app.s3.endpointOverride="})
 class ApplicationTest {
 
     @Autowired
@@ -250,6 +251,9 @@ class ApplicationTest {
         assertThat(this.jobs.size()).isZero();
         assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
         assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
+
+        FileStore fileStore = new FileStore(applicationConfig);
+        fileStore.deleteFiles();
     }
 
     @AfterEach
@@ -516,6 +520,35 @@ class ApplicationTest {
         assertThat(reportsParsed).hasSize(1);
     }
 
+    @Test
+    void testHistoricalData() throws Exception {
+        // test
+        final String JOB_ID = "testHistoricalData";
+
+        // Register producer, Register types
+        waitForRegistration();
+
+        FileStore fileStore = new FileStore(applicationConfig);
+        fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
+                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getSourceNames().add("O-DU-1122");
+        filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+
+        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
+                new Job.BufferTimeout(123, 456), null, null);
+        String paramJson = gson.toJson(param);
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+    }
+
     @Test
     void testJsltFiltering() throws Exception {
         final String JOB_ID = "ID";