+ Flux<ApplicationConfig> createRefreshTask() {
+ return getEnvironment(systemEnvironment) //
+ .flatMap(this::createCbsClient) //
+ .flatMapMany(this::periodicConfigurationUpdates) //
+ .map(this::parseRicConfigurationfromConsul) //
+ .onErrorResume(this::onErrorResume);
+ }
+
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+ try {
+ ApplicationConfigParser parser = new ApplicationConfigParser();
+ parser.parse(jsonObject);
+ setConfiguration(parser.getRicConfigs());
+
+ } catch (ServiceException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
+ private synchronized void setConfiguration(@NotNull Vector<RicConfig> ricConfigs) {
+ this.ricConfigs = ricConfigs;
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }