- Flux<ApplicationConfig> createRefreshTask() {
- return getEnvironment(systemEnvironment) //
- .flatMap(this::createCbsClient) //
- .flatMapMany(this::periodicConfigurationUpdates) //
- .map(this::parseRicConfigurationfromConsul) //
- .onErrorResume(this::onErrorResume);
- }
-
- Mono<CbsClient> createCbsClient(EnvProperties env) {
- return CbsClientFactory.createCbsClient(env);
- }
-
- private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
- final Duration initialDelay = Duration.ZERO;
- final Duration refreshPeriod = Duration.ofMinutes(1);
- final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
- return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
- }
-
- private <R> Mono<R> onErrorResume(Throwable trowable) {
- logger.error("Could not refresh application configuration {}", trowable.toString());
- return Mono.empty();
- }
-
- private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
- try {
- ApplicationConfigParser parser = new ApplicationConfigParser();
- parser.parse(jsonObject);
- setConfiguration(parser.getRicConfigs());
-
- } catch (ServiceException e) {
- logger.error("Could not parse configuration {}", e.toString(), e);