Added a testcase for readding historical PM data from file system.
Fixed one bug.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ic48dfd8177fb25ed49f6179d270f238988ece3fd
package org.oran.dmaapadapter.datastore;
import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class FileStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ApplicationConfig applicationConfig;
root = root.getParent();
}
+ logger.debug("Listing files in: {}", root);
+
List<String> result = new ArrayList<>();
try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
private void filterListFiles(Path path, String prefix, List<String> result) {
if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
result.add(externalName(path));
+ } else {
+ logger.debug("Ignoring file {} that does not start with: {}", path, prefix);
}
}
- private String externalName(Path f) {
- String fullName = f.toString();
- return fullName.substring(applicationConfig.getPmFilesPath().length());
+ private String externalName(Path path) {
+ String fullName = path.toString();
+ String externalName = fullName.substring(applicationConfig.getPmFilesPath().length());
+ if (externalName.startsWith("/")) {
+ externalName = externalName.substring(1);
+ }
+ return externalName;
}
@Override
return Path.of(applicationConfig.getPmFilesPath(), name);
}
+ public void deleteFiles() {
+ try {
+ FileSystemUtils.deleteRecursively(Path.of(applicationConfig.getPmFilesPath()));
+ } catch (IOException e) {
+ logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.getPmFilesPath(),
+ e.getMessage());
+ }
+ }
+
}
.credentialsProvider(StaticCredentialsProvider.create( //
AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
applicationConfig.getS3SecretAccessKey())));
-
}
@Override
this.job = job;
this.applConfig = applConfig;
this.fileStore = applConfig.isS3Enabled() ? new S3ObjectStore(applConfig) : new FileStore(applConfig);
+
+ if (applConfig.isS3Enabled()) {
+ S3ObjectStore fs = new S3ObjectStore(applConfig);
+ fs.createS3Bucket(DataStore.Bucket.FILES).subscribe();
+ fs.createS3Bucket(DataStore.Bucket.LOCKS).subscribe();
+ }
}
public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
this.fileStore.createLock(collectHistoricalDataLockName()) //
- .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted)
+ .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted)
: Mono.error(new LockedException(collectHistoricalDataLockName()))) //
- .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames()))
+ .doOnNext(n -> logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId())) //
+ .doOnError(t -> logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
+ this.job.getId())) //
+ .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
+ .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
+ this.job.getId())) //
.flatMap(sourceName -> fileStore.listFiles(DataStore.Bucket.FILES, sourceName), 1) //
.filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) //
.map(this::createFakeEvent) //
}
private Mono<String> handleCollectHistoricalDataError(Throwable t) {
-
if (t instanceof LockedException) {
logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
return Mono.empty(); // Ignore
} else {
return tryDeleteLockFile() //
.map(bool -> "OK");
-
}
}
private boolean filterStartTime(String startTimeStr, String fileName) {
// A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json
try {
- String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
- fileTimePart = fileTimePart.substring(0, 18);
+ if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) {
- DateTimeFormatter formatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+ String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
+ fileTimePart = fileTimePart.substring(0, 18);
- OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
- OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
+ DateTimeFormatter formatter =
+ new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+
+ OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
+ OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
+ boolean isBefore = startTime.isBefore(fileStartTime);
+ logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isBefore: {}", fileName,
+ fileStartTime, startTime, isBefore);
+ return isBefore;
+ } else {
+ return false;
+ }
- return startTime.isBefore(fileStartTime);
} catch (Exception e) {
logger.warn("Time parsing exception: {}", e.getMessage());
return false;
"app.webclient.trust-store=./config/truststore.jks", //
"app.webclient.trust-store-used=true", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.pm-files-path=/tmp", "app.s3.endpointOverride="})
+ "app.pm-files-path=/tmp/dmaapadaptor", //
+ "app.s3.endpointOverride="})
class ApplicationTest {
@Autowired
assertThat(this.jobs.size()).isZero();
assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
+
+ FileStore fileStore = new FileStore(applicationConfig);
+ fileStore.deleteFiles();
}
@AfterEach
assertThat(reportsParsed).hasSize(1);
}
+ @Test
+ void testHistoricalData() throws Exception {
+ // test
+ final String JOB_ID = "testHistoricalData";
+
+ // Register producer, Register types
+ waitForRegistration();
+
+ FileStore fileStore = new FileStore(applicationConfig);
+ fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
+ "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getSourceNames().add("O-DU-1122");
+ filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
+ new Job.BufferTimeout(123, 456), null, null);
+ String paramJson = gson.toJson(param);
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ }
+
@Test
void testJsltFiltering() throws Exception {
final String JOB_ID = "ID";