X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRefreshConfigTask.java;h=05bcb0f25e7fcaea2327e4c844e94979dc85cfda;hb=db6bb3802115a45cece80aed62683e02cd7d3282;hp=7cfe4868ee69e277b2ccd5184c003bc0f3c76146;hpb=0316038265be758ae8b1bce006b5fd017185c55d;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 7cfe4868..05bcb0f2 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; @@ -109,9 +108,8 @@ public class RefreshConfigTask { logger.debug("Starting refreshConfigTask"); stop(); refreshTask = createRefreshTask() // - .subscribe( - notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger - .error("Configuration refresh terminated due to exception {}", throwable.getMessage()), + .subscribe(notUsed -> logger.debug("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()), () -> logger.error("Configuration refresh terminated")); } @@ -123,17 +121,17 @@ public class RefreshConfigTask { Flux createRefreshTask() { Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // - .filter(notUsed -> configFileExists()) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // - .onErrorResume(this::ignoreError) // + .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux loadFromConsul = getEnvironment(systemEnvironment) // + Flux loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) // + .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // - .flatMapMany(this::periodicConfigurationUpdates) // - .onErrorResume(this::ignoreError) // + .flatMap(this::getFromCbs) // + .onErrorResume(this::ignoreErrorMono) // .doOnNext(json -> logger.debug("loadFromConsul succeeded")) // .doOnNext(json -> this.isConsulUsed = true) // .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); @@ -147,20 +145,32 @@ public class RefreshConfigTask { } Mono getEnvironment(Properties systemEnvironment) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // + .onErrorResume(t -> Mono.empty()); } Mono createCbsClient(EnvProperties env) { - return CbsClientFactory.createCbsClient(env); + return CbsClientFactory.createCbsClient(env) // + .onErrorResume(this::ignoreErrorMono); } - private Flux periodicConfigurationUpdates(CbsClient cbsClient) { - final Duration initialDelay = Duration.ZERO; + private Mono getFromCbs(CbsClient cbsClient) { final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL); + try { + return cbsClient.get(getConfigRequest) // + .onErrorResume(this::ignoreErrorMono); + } catch (Exception e) { + return ignoreErrorMono(e); + } + } + + private Flux ignoreErrorFlux(Throwable throwable) { + String errMsg = throwable.toString(); + logger.warn("Could not refresh application configuration. {}", errMsg); + return Flux.empty(); } - private Mono ignoreError(Throwable throwable) { + private Mono ignoreErrorMono(Throwable throwable) { String errMsg = throwable.toString(); logger.warn("Could not refresh application configuration. {}", errMsg); return Mono.empty(); @@ -170,9 +180,10 @@ public class RefreshConfigTask { try { ApplicationConfigParser parser = new ApplicationConfigParser(); return Mono.just(parser.parse(jsonObject)); - } catch (ServiceException e) { - logger.error("Could not parse configuration {}", e.toString(), e); - return Mono.error(e); + } catch (Exception e) { + String str = e.toString(); + logger.error("Could not parse configuration {}", str); + return Mono.empty(); } } @@ -180,8 +191,7 @@ public class RefreshConfigTask { return this.appConfig.setConfiguration(config); } - boolean configFileExists() { - String filepath = appConfig.getLocalConfigurationFilePath(); + boolean fileExists(String filepath) { return (filepath != null && (new File(filepath).exists())); } @@ -223,6 +233,10 @@ public class RefreshConfigTask { */ Flux loadConfigurationFromFile() { String filepath = appConfig.getLocalConfigurationFilePath(); + if (!fileExists(filepath)) { + return Flux.empty(); + } + GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); @@ -232,8 +246,8 @@ public class RefreshConfigTask { appParser.parse(rootObject); logger.debug("Local configuration file loaded: {}", filepath); return Flux.just(rootObject); - } catch (JsonSyntaxException | ServiceException | IOException e) { - logger.debug("Local configuration file not loaded: {}", filepath, e); + } catch (IOException | ServiceException e) { + logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage()); return Flux.empty(); } }