- throw new ServiceException("Could not find ric: " + ricName);
- }
-
- public void initialize() {
- stop();
- loadConfigurationFromFile();
-
- refreshConfigTask = createRefreshTask() //
- .subscribe(notUsed -> logger.info("Refreshed configuration data"),
- throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
- () -> logger.error("Configuration refresh terminated"));
- }
-
- public static enum RicConfigUpdate {
- ADDED, CHANGED, REMOVED
- }
-
- public interface Observer {
- void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
- }
-
- public void addObserver(Observer o) {
- this.observers.add(o);
- }
-
- Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
- }
-
- Flux<ApplicationConfig> createRefreshTask() {
- return getEnvironment(systemEnvironment) //
- .flatMap(this::createCbsClient) //
- .flatMapMany(this::periodicConfigurationUpdates) //
- .map(this::parseRicConfigurationfromConsul) //
- .onErrorResume(this::onErrorResume);
- }
-
- Mono<CbsClient> createCbsClient(EnvProperties env) {
- return CbsClientFactory.createCbsClient(env);
- }
-
- private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
- final Duration initialDelay = Duration.ZERO;
- final Duration refreshPeriod = Duration.ofMinutes(1);
- final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
- return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
- }
-
- private <R> Mono<R> onErrorResume(Throwable trowable) {
- logger.error("Could not refresh application configuration {}", trowable.toString());
- return Mono.empty();