Making concurrency configurable 28/10728/2
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 13 Mar 2023 08:59:59 +0000 (09:59 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 13 Mar 2023 11:39:02 +0000 (12:39 +0100)
The concurrency is made configurable by means of a parameter in application.yaml.
The retry when storing in S3 failed is made less aggressive.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-853
Change-Id: I818ca633adbc08e7a74b1092327af5191200cc44

datafilecollector/config/application.yaml
datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java

index 71f3172..5b00a74 100644 (file)
@@ -28,7 +28,9 @@ logging:
 app:
   filepath: config/datafile_endpoints_test.json
   collected-files-path: "/tmp/onap_datafile/"
-   # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+  # Numer of worker threads. Increased number may increase throughput, but will require more executing resources.
+  number-of-worker-treads: 200
+  # KAFKA boostrap servers.
   # several redundant boostrap servers can be specified, separated by a comma ','.
   kafka:
     bootstrap-servers: localhost:9092
index f8be04d..f25d6fd 100644 (file)
@@ -95,6 +95,10 @@ public class AppConfig {
     @Value("${app.s3.locksBucket:}")
     private String s3LocksBucket;
 
+    @Value("${app.number-of-worker-treads:200}")
+    @Getter
+    private int noOfWorkerThreads;
+
     public String getS3LocksBucket() {
         return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
     }
index c4f2ac3..46e71cc 100644 (file)
@@ -70,7 +70,6 @@ public class CollectAndReportFiles {
         .disableHtmlEscaping() //
         .create(); //
 
-    private static final int NUMBER_OF_WORKER_THREADS = 200;
     private static final long FILE_TRANSFER_MAX_RETRIES = 2;
     private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
 
@@ -132,11 +131,12 @@ public class CollectAndReportFiles {
     }
 
     Flux<FilePublishInformation> createMainTask() {
-        Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
+        final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads();
+        Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads);
         return fetchFromKafka() //
             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
             .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
-            .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+            .parallel(noOfWorkerThreads) // Each FileReadyMessage in a separate thread
             .runOn(scheduler) //
             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
             .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
@@ -155,7 +155,7 @@ public class CollectAndReportFiles {
         if (this.appConfig.isS3Enabled()) {
             return dataStore.copyFileTo(locaFilePath(info), info.getName())
                 .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
-                .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
+                .retryWhen(Retry.backoff(4, Duration.ofMillis(1000))) //
                 .map(f -> info) //
                 .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
                     t.getMessage())) //