Making concurrency configurable
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / onap / dcaegen2 / collectors / datafile / tasks / CollectAndReportFiles.java
index c4f2ac3..46e71cc 100644 (file)
@@ -70,7 +70,6 @@ public class CollectAndReportFiles {
         .disableHtmlEscaping() //
         .create(); //
 
-    private static final int NUMBER_OF_WORKER_THREADS = 200;
     private static final long FILE_TRANSFER_MAX_RETRIES = 2;
     private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
 
@@ -132,11 +131,12 @@ public class CollectAndReportFiles {
     }
 
     Flux<FilePublishInformation> createMainTask() {
-        Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
+        final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads();
+        Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads);
         return fetchFromKafka() //
             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
             .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
-            .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+            .parallel(noOfWorkerThreads) // Each FileReadyMessage in a separate thread
             .runOn(scheduler) //
             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
             .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
@@ -155,7 +155,7 @@ public class CollectAndReportFiles {
         if (this.appConfig.isS3Enabled()) {
             return dataStore.copyFileTo(locaFilePath(info), info.getName())
                 .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
-                .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
+                .retryWhen(Retry.backoff(4, Duration.ofMillis(1000))) //
                 .map(f -> info) //
                 .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
                     t.getMessage())) //