app:
filepath: config/datafile_endpoints_test.json
collected-files-path: "/tmp/onap_datafile/"
- # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+ # Numer of worker threads. Increased number may increase throughput, but will require more executing resources.
+ number-of-worker-treads: 200
+ # KAFKA boostrap servers.
# several redundant boostrap servers can be specified, separated by a comma ','.
kafka:
bootstrap-servers: localhost:9092
.disableHtmlEscaping() //
.create(); //
- private static final int NUMBER_OF_WORKER_THREADS = 200;
private static final long FILE_TRANSFER_MAX_RETRIES = 2;
private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
}
Flux<FilePublishInformation> createMainTask() {
- Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
+ final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads();
+ Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads);
return fetchFromKafka() //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
.doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
- .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+ .parallel(noOfWorkerThreads) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
if (this.appConfig.isS3Enabled()) {
return dataStore.copyFileTo(locaFilePath(info), info.getName())
.doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
- .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
+ .retryWhen(Retry.backoff(4, Duration.ofMillis(1000))) //
.map(f -> info) //
.doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
t.getMessage())) //