+
+ refreshConfigTask = createRefreshTask() //
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+ }
+
+ Flux<ApplicationConfig> createRefreshTask() {
+ return getEnvironment(systemEnvironment) //
+ .flatMap(this::createCbsClient) //
+ .flatMapMany(this::periodicConfigurationUpdates) //
+ .map(this::parseRicConfigurationfromConsul) //
+ .onErrorResume(this::onErrorResume);
+ }
+
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);