From: PatrikBuhr Date: Mon, 10 Oct 2022 08:58:11 +0000 (+0200) Subject: Improved traces. X-Git-Tag: 1.2.0~11 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=e9920e5ffcd72333e831da76d255a19a3a7c947c;p=nonrtric%2Fplt%2Fdmaapadapter.git Improved traces. Added a testcase for readding historical PM data from file system. Fixed one bug. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Ic48dfd8177fb25ed49f6179d270f238988ece3fd --- diff --git a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java index 3d4cb4d..430b62f 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java @@ -21,6 +21,8 @@ package org.oran.dmaapadapter.datastore; import java.io.File; +import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -29,11 +31,15 @@ import java.util.List; import java.util.stream.Stream; import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.FileSystemUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class FileStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); ApplicationConfig applicationConfig; @@ -48,6 +54,8 @@ public class FileStore implements DataStore { root = root.getParent(); } + logger.debug("Listing files in: {}", root); + List result = new ArrayList<>(); try (Stream stream = Files.walk(root, Integer.MAX_VALUE)) { @@ -62,12 +70,18 @@ public class FileStore implements DataStore { private void filterListFiles(Path path, String prefix, List result) { if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { result.add(externalName(path)); + } else { + logger.debug("Ignoring file {} that does not start with: {}", path, prefix); } } - private String externalName(Path f) { - String fullName = f.toString(); - return fullName.substring(applicationConfig.getPmFilesPath().length()); + private String externalName(Path path) { + String fullName = path.toString(); + String externalName = fullName.substring(applicationConfig.getPmFilesPath().length()); + if (externalName.startsWith("/")) { + externalName = externalName.substring(1); + } + return externalName; } @Override @@ -121,4 +135,13 @@ public class FileStore implements DataStore { return Path.of(applicationConfig.getPmFilesPath(), name); } + public void deleteFiles() { + try { + FileSystemUtils.deleteRecursively(Path.of(applicationConfig.getPmFilesPath())); + } catch (IOException e) { + logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.getPmFilesPath(), + e.getMessage()); + } + } + } diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java index de9da16..fdbd3e4 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java @@ -81,7 +81,6 @@ public class S3ObjectStore implements DataStore { .credentialsProvider(StaticCredentialsProvider.create( // AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // applicationConfig.getS3SecretAccessKey()))); - } @Override diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 6d2327d..da28e64 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -95,6 +95,12 @@ public abstract class JobDataDistributor { this.job = job; this.applConfig = applConfig; this.fileStore = applConfig.isS3Enabled() ? new S3ObjectStore(applConfig) : new FileStore(applConfig); + + if (applConfig.isS3Enabled()) { + S3ObjectStore fs = new S3ObjectStore(applConfig); + fs.createS3Bucket(DataStore.Bucket.FILES).subscribe(); + fs.createS3Bucket(DataStore.Bucket.LOCKS).subscribe(); + } } public synchronized void start(Flux input) { @@ -120,9 +126,14 @@ public abstract class JobDataDistributor { if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { this.fileStore.createLock(collectHistoricalDataLockName()) // - .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted) + .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted) : Mono.error(new LockedException(collectHistoricalDataLockName()))) // - .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) + .doOnNext(n -> logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId())) // + .doOnError(t -> logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}", + this.job.getId())) // + .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) // + .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName, + this.job.getId())) // .flatMap(sourceName -> fileStore.listFiles(DataStore.Bucket.FILES, sourceName), 1) // .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) // .map(this::createFakeEvent) // @@ -134,14 +145,12 @@ public abstract class JobDataDistributor { } private Mono handleCollectHistoricalDataError(Throwable t) { - if (t instanceof LockedException) { logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId()); return Mono.empty(); // Ignore } else { return tryDeleteLockFile() // .map(bool -> "OK"); - } } @@ -159,15 +168,24 @@ public abstract class JobDataDistributor { private boolean filterStartTime(String startTimeStr, String fileName) { // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json try { - String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2); - fileTimePart = fileTimePart.substring(0, 18); + if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) { - DateTimeFormatter formatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); + String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2); + fileTimePart = fileTimePart.substring(0, 18); - OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter); - OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr); + DateTimeFormatter formatter = + new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); + + OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter); + OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr); + boolean isBefore = startTime.isBefore(fileStartTime); + logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isBefore: {}", fileName, + fileStartTime, startTime, isBefore); + return isBefore; + } else { + return false; + } - return startTime.isBefore(fileStartTime); } catch (Exception e) { logger.warn("Time parsing exception: {}", e.getMessage()); return false; diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index fe305c5..393cd27 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -93,7 +93,8 @@ import reactor.test.StepVerifier; "app.webclient.trust-store=./config/truststore.jks", // "app.webclient.trust-store-used=true", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // - "app.pm-files-path=/tmp", "app.s3.endpointOverride="}) + "app.pm-files-path=/tmp/dmaapadaptor", // + "app.s3.endpointOverride="}) class ApplicationTest { @Autowired @@ -250,6 +251,9 @@ class ApplicationTest { assertThat(this.jobs.size()).isZero(); assertThat(this.consumerController.testResults.receivedBodies).isEmpty(); assertThat(this.consumerController.testResults.receivedHeaders).isEmpty(); + + FileStore fileStore = new FileStore(applicationConfig); + fileStore.deleteFiles(); } @AfterEach @@ -516,6 +520,35 @@ class ApplicationTest { assertThat(reportsParsed).hasSize(1); } + @Test + void testHistoricalData() throws Exception { + // test + final String JOB_ID = "testHistoricalData"; + + // Register producer, Register types + waitForRegistration(); + + FileStore fileStore = new FileStore(applicationConfig); + fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getSourceNames().add("O-DU-1122"); + filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00"); + + Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, + new Job.BufferTimeout(123, 456), null, null); + String paramJson = gson.toJson(param); + ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); + + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); + } + @Test void testJsltFiltering() throws Exception { final String JOB_ID = "ID";