X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRefreshConfigTask.java;h=dd235db555b1a0b6208b19111e3c10410129bbac;hb=894ef7c6ead6a3617a1190d7c0b36c0d1c21a0be;hp=41f2064a9de9d90a542c00bd21d91b05919daf8d;hpb=81bdaffd323c941da910c258487b7efb78615d6d;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 41f2064a..dd235db5 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -124,46 +124,49 @@ public class RefreshConfigTask { .filter(notUsed -> configFileExists()) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // - .onErrorResume(this::ignoreError) // + .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // - .doOnTerminate(() -> logger.info("loadFromFile Terminate")); + .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux loadFromConsul = getEnvironment(systemEnvironment) // + Flux loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) // + .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // - .flatMapMany(this::periodicConfigurationUpdates) // - .onErrorResume(this::ignoreError) // + .flatMap(this::getFromCbs) // .doOnNext(json -> logger.debug("loadFromConsul succeeded")) // .doOnNext(json -> this.isConsulUsed = true) // - .doOnTerminate(() -> logger.info("loadFromConsul Terminated")); + .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig) // .doOnNext(this::handleUpdatedRicConfig) // .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // - .doOnTerminate(() -> handleTerminate("Configuration refresh task is terminated")); - } - - private void handleTerminate(String info) { - logger.error(info); + .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } Mono getEnvironment(Properties systemEnvironment) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // + .onErrorResume(t -> Mono.empty()); } Mono createCbsClient(EnvProperties env) { - return CbsClientFactory.createCbsClient(env); + return CbsClientFactory.createCbsClient(env) // + .onErrorResume(this::ignoreErrorMono); } - private Flux periodicConfigurationUpdates(CbsClient cbsClient) { - final Duration initialDelay = Duration.ZERO; + private Mono getFromCbs(CbsClient cbsClient) { final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL) // - .onErrorResume(this::ignoreError); + return cbsClient.get(getConfigRequest) // + .onErrorResume(this::ignoreErrorMono); + } + + private Flux ignoreErrorFlux(Throwable throwable) { + String errMsg = throwable.toString(); + logger.warn("Could not refresh application configuration. {}", errMsg); + return Flux.empty(); } - private Mono ignoreError(Throwable throwable) { + private Mono ignoreErrorMono(Throwable throwable) { String errMsg = throwable.toString(); logger.warn("Could not refresh application configuration. {}", errMsg); return Mono.empty();