Minor changes 94/9594/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 11 Nov 2022 08:44:33 +0000 (09:44 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 11 Nov 2022 08:44:33 +0000 (09:44 +0100)
Infinit number of files in S3 handled.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ie2fe4c853ce0c9ab4ca49c7fbc8a60ba44231bfe

src/main/java/org/oran/dmaapadapter/datastore/FileStore.java
src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 9e7232c..d91adab 100644 (file)
@@ -98,10 +98,12 @@ class FileStore implements DataStore {
     public Mono<Boolean> createLock(String name) {
         File file = path(name).toFile();
         try {
+            Files.createDirectories(path(name).getParent());
             boolean res = file.createNewFile();
             return Mono.just(res);
         } catch (Exception e) {
-            return Mono.just(file.exists());
+            logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage());
+            return Mono.just(!file.exists());
         }
     }
 
index 5cf7c86..18b91c8 100644 (file)
@@ -218,22 +218,35 @@ class S3ObjectStore implements DataStore {
         return Mono.fromFuture(future);
     }
 
-    private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
-        ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
+    private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
+            ListObjectsResponse prevResponse) {
+        ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
                 .bucket(bucket) //
-                .maxKeys(1100) //
-                .prefix(prefix) //
-                .build();
+                .maxKeys(1000) //
+                .prefix(prefix);
+
+        if (prevResponse != null) {
+            if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
+                builder.marker(prevResponse.nextMarker());
+            } else {
+                return Mono.empty();
+            }
+        }
+
+        ListObjectsRequest listObjectsRequest = builder.build();
         CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+        return Mono.fromFuture(future);
+    }
 
-        return Mono.fromFuture(future) //
+    private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+
+        return listObjectsRequest(bucket, prefix, null) //
+                .expand(response -> listObjectsRequest(bucket, prefix, response)) //
                 .map(ListObjectsResponse::contents) //
                 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
                 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
-                .flatMapMany(Flux::fromIterable) //
-                .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
-
-        ;
+                .flatMap(Flux::fromIterable) //
+                .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
     }
 
     private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
index 2e247eb..f9353f1 100644 (file)
@@ -36,7 +36,6 @@ import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
@@ -60,28 +59,15 @@ public abstract class JobDataDistributor {
     private final ApplicationConfig applConfig;
 
     private class ErrorStats {
+        @Getter
         private int consumerFaultCounter = 0;
-        private boolean irrecoverableError = false; // eg. overflow
 
         public void handleOkFromConsumer() {
             this.consumerFaultCounter = 0;
         }
 
         public void handleException(Throwable t) {
-            if (t instanceof WebClientResponseException) {
-                ++this.consumerFaultCounter;
-            } else {
-                irrecoverableError = true;
-            }
-        }
-
-        public boolean isItHopeless() {
-            final int STOP_AFTER_ERRORS = 5;
-            return irrecoverableError || consumerFaultCounter > STOP_AFTER_ERRORS;
-        }
-
-        public void resetIrrecoverableErrors() {
-            irrecoverableError = false;
+            ++this.consumerFaultCounter;
         }
     }
 
@@ -91,8 +77,6 @@ public abstract class JobDataDistributor {
         this.dataStore = DataStore.create(applConfig);
         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
         this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
-
-        this.errorStats.resetIrrecoverableErrors();
     }
 
     public void start(Flux<TopicListener.DataFromTopic> input) {
@@ -168,9 +152,7 @@ public abstract class JobDataDistributor {
     }
 
     private TopicListener.DataFromTopic createFakeEvent(String fileName) {
-
         NewFileEvent ev = new NewFileEvent(fileName);
-
         return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
     }
 
@@ -284,12 +266,7 @@ public abstract class JobDataDistributor {
     private Mono<String> handleError(Throwable t) {
         logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
         this.errorStats.handleException(t);
-        if (this.errorStats.isItHopeless()) {
-            logger.error("Giving up: {} job: {}", t.getMessage(), job.getId());
-            return Mono.error(t);
-        } else {
-            return Mono.empty(); // Ignore
-        }
+        return Mono.empty(); // Ignore
     }
 
     private void handleSentOk(String data) {
index ab553b6..b6a0fad 100644 (file)
@@ -577,8 +577,8 @@ class IntegrationWithKafka {
         filterData.getMeasTypes().add("pmCounterNumber1");
         filterData.getMeasObjClass().add("NRCellCU");
 
-        this.applicationConfig.setZipOutput(true);
-        final int NO_OF_JOBS = 150;
+        this.applicationConfig.setZipOutput(false);
+        final int NO_OF_JOBS = 100;
         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
         for (int i = 0; i < NO_OF_JOBS; ++i) {
             final String outputTopic = "manyJobs_" + i;
@@ -591,7 +591,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 1000;
+        final int NO_OF_OBJECTS = 10000;
 
         Instant startTime = Instant.now();