From 297f5f94a207b35a88b34af249a4840c65a21e31 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 13 Mar 2023 09:59:59 +0100 Subject: [PATCH] Making concurrency configurable The concurrency is made configurable by means of a parameter in application.yaml. The retry when storing in S3 failed is made less aggressive. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-853 Change-Id: I818ca633adbc08e7a74b1092327af5191200cc44 --- datafilecollector/config/application.yaml | 4 +++- .../dcaegen2/collectors/datafile/configuration/AppConfig.java | 4 ++++ .../dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java | 8 ++++---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/datafilecollector/config/application.yaml b/datafilecollector/config/application.yaml index 71f3172..5b00a74 100644 --- a/datafilecollector/config/application.yaml +++ b/datafilecollector/config/application.yaml @@ -28,7 +28,9 @@ logging: app: filepath: config/datafile_endpoints_test.json collected-files-path: "/tmp/onap_datafile/" - # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic + # Numer of worker threads. Increased number may increase throughput, but will require more executing resources. + number-of-worker-treads: 200 + # KAFKA boostrap servers. # several redundant boostrap servers can be specified, separated by a comma ','. kafka: bootstrap-servers: localhost:9092 diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index f8be04d..f25d6fd 100644 --- a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -95,6 +95,10 @@ public class AppConfig { @Value("${app.s3.locksBucket:}") private String s3LocksBucket; + @Value("${app.number-of-worker-treads:200}") + @Getter + private int noOfWorkerThreads; + public String getS3LocksBucket() { return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket; } diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java index c4f2ac3..46e71cc 100644 --- a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java +++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java @@ -70,7 +70,6 @@ public class CollectAndReportFiles { .disableHtmlEscaping() // .create(); // - private static final int NUMBER_OF_WORKER_THREADS = 200; private static final long FILE_TRANSFER_MAX_RETRIES = 2; private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); @@ -132,11 +131,12 @@ public class CollectAndReportFiles { } Flux createMainTask() { - Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); + final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads(); + Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads); return fetchFromKafka() // .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) // .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // - .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread + .parallel(noOfWorkerThreads) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) // .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) // @@ -155,7 +155,7 @@ public class CollectAndReportFiles { if (this.appConfig.isS3Enabled()) { return dataStore.copyFileTo(locaFilePath(info), info.getName()) .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) // - .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) // + .retryWhen(Retry.backoff(4, Duration.ofMillis(1000))) // .map(f -> info) // .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(), t.getMessage())) // -- 2.16.6