From: PatrikBuhr Date: Fri, 11 Nov 2022 08:44:33 +0000 (+0100) Subject: Minor changes X-Git-Tag: 1.2.0~1 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=3f6a4eed65870011adf57de0f1b96c1bf3a2dfe4;p=nonrtric%2Fplt%2Fdmaapadapter.git Minor changes Infinit number of files in S3 handled. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Ie2fe4c853ce0c9ab4ca49c7fbc8a60ba44231bfe --- diff --git a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java index 9e7232c..d91adab 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java @@ -98,10 +98,12 @@ class FileStore implements DataStore { public Mono createLock(String name) { File file = path(name).toFile(); try { + Files.createDirectories(path(name).getParent()); boolean res = file.createNewFile(); return Mono.just(res); } catch (Exception e) { - return Mono.just(file.exists()); + logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage()); + return Mono.just(!file.exists()); } } diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java index 5cf7c86..18b91c8 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java @@ -218,22 +218,35 @@ class S3ObjectStore implements DataStore { return Mono.fromFuture(future); } - private Flux listObjectsInBucket(String bucket, String prefix) { - ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() // + private Mono listObjectsRequest(String bucket, String prefix, + ListObjectsResponse prevResponse) { + ListObjectsRequest.Builder builder = ListObjectsRequest.builder() // .bucket(bucket) // - .maxKeys(1100) // - .prefix(prefix) // - .build(); + .maxKeys(1000) // + .prefix(prefix); + + if (prevResponse != null) { + if (Boolean.TRUE.equals(prevResponse.isTruncated())) { + builder.marker(prevResponse.nextMarker()); + } else { + return Mono.empty(); + } + } + + ListObjectsRequest listObjectsRequest = builder.build(); CompletableFuture future = s3AsynchClient.listObjects(listObjectsRequest); + return Mono.fromFuture(future); + } - return Mono.fromFuture(future) // + private Flux listObjectsInBucket(String bucket, String prefix) { + + return listObjectsRequest(bucket, prefix, null) // + .expand(response -> listObjectsRequest(bucket, prefix, response)) // .map(ListObjectsResponse::contents) // .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // - .flatMapMany(Flux::fromIterable) // - .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) // - - ; + .flatMap(Flux::fromIterable) // + .doOnNext(obj -> logger.debug("Found object: {}", obj.key())); } private Mono getDataFromS3Object(String bucket, String key) { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 2e247eb..f9353f1 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -36,7 +36,6 @@ import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -60,28 +59,15 @@ public abstract class JobDataDistributor { private final ApplicationConfig applConfig; private class ErrorStats { + @Getter private int consumerFaultCounter = 0; - private boolean irrecoverableError = false; // eg. overflow public void handleOkFromConsumer() { this.consumerFaultCounter = 0; } public void handleException(Throwable t) { - if (t instanceof WebClientResponseException) { - ++this.consumerFaultCounter; - } else { - irrecoverableError = true; - } - } - - public boolean isItHopeless() { - final int STOP_AFTER_ERRORS = 5; - return irrecoverableError || consumerFaultCounter > STOP_AFTER_ERRORS; - } - - public void resetIrrecoverableErrors() { - irrecoverableError = false; + ++this.consumerFaultCounter; } } @@ -91,8 +77,6 @@ public abstract class JobDataDistributor { this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); this.dataStore.create(DataStore.Bucket.LOCKS).subscribe(); - - this.errorStats.resetIrrecoverableErrors(); } public void start(Flux input) { @@ -168,9 +152,7 @@ public abstract class JobDataDistributor { } private TopicListener.DataFromTopic createFakeEvent(String fileName) { - NewFileEvent ev = new NewFileEvent(fileName); - return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false); } @@ -284,12 +266,7 @@ public abstract class JobDataDistributor { private Mono handleError(Throwable t) { logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); this.errorStats.handleException(t); - if (this.errorStats.isItHopeless()) { - logger.error("Giving up: {} job: {}", t.getMessage(), job.getId()); - return Mono.error(t); - } else { - return Mono.empty(); // Ignore - } + return Mono.empty(); // Ignore } private void handleSentOk(String data) { diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index ab553b6..b6a0fad 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -577,8 +577,8 @@ class IntegrationWithKafka { filterData.getMeasTypes().add("pmCounterNumber1"); filterData.getMeasObjClass().add("NRCellCU"); - this.applicationConfig.setZipOutput(true); - final int NO_OF_JOBS = 150; + this.applicationConfig.setZipOutput(false); + final int NO_OF_JOBS = 100; ArrayList receivers = new ArrayList<>(); for (int i = 0; i < NO_OF_JOBS; ++i) { final String outputTopic = "manyJobs_" + i; @@ -591,7 +591,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 1000; + final int NO_OF_OBJECTS = 10000; Instant startTime = Instant.now();