+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+ try {
+ ApplicationConfigParser parser = new ApplicationConfigParser();
+ parser.parse(jsonObject);
+ setConfiguration(parser.getRicConfigs());
+
+ } catch (ServiceException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
+ private class Notification {
+ final RicConfig ric;
+ final RicConfigUpdate event;
+
+ Notification(RicConfig ric, RicConfigUpdate event) {
+ this.ric = ric;
+ this.event = event;
+ }
+ }
+
+ private void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
+ Collection<Notification> notifications = new Vector<>();
+ synchronized (this) {
+ Map<String, RicConfig> newRicConfigs = new HashMap<>();
+ for (RicConfig newConfig : ricConfigs) {
+ RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+ if (oldConfig == null) {
+ newRicConfigs.put(newConfig.name(), newConfig);
+ notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
+ this.ricConfigs.remove(newConfig.name());
+ } else if (!newConfig.equals(newConfig)) {
+ notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
+ newRicConfigs.put(newConfig.name(), newConfig);
+ this.ricConfigs.remove(newConfig.name());
+ } else {
+ newRicConfigs.put(oldConfig.name(), oldConfig);
+ }
+ }
+ for (RicConfig deletedConfig : this.ricConfigs.values()) {
+ notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
+ }
+ this.ricConfigs = newRicConfigs;
+ }
+ notifyObservers(notifications);
+ }
+
+ private void notifyObservers(Collection<Notification> notifications) {
+ for (Observer observer : this.observers) {
+ for (Notification notif : notifications) {
+ observer.onRicConfigUpdate(notif.ric, notif.event);
+ }
+ }
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }