private static final long FILE_TRANSFER_MAX_RETRIES = 2;
private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
private static final long FILE_TRANSFER_MAX_RETRIES = 2;
private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
- Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
+ final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads();
+ Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads);
return fetchFromKafka() //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
.doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
return fetchFromKafka() //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
.doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
if (this.appConfig.isS3Enabled()) {
return dataStore.copyFileTo(locaFilePath(info), info.getName())
.doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
if (this.appConfig.isS3Enabled()) {
return dataStore.copyFileTo(locaFilePath(info), info.getName())
.doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //