X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRefreshConfigTask.java;h=b99a230d0e5e4c110a19bd02581c3b55628ef52f;hb=6a39814272307d0207222c9229b0d765ac062bf0;hp=81735d7242e33fedf2d33cc56ef13dc0ff22b99d;hpb=4f602854561a08e754eb0c4ba9327bf49b0e63d7;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 81735d72..b99a230d 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; @@ -39,6 +38,9 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotNull; +import lombok.AccessLevel; +import lombok.Getter; + import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; @@ -50,7 +52,6 @@ import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate; import org.oransc.policyagent.configuration.ApplicationConfigParser; import org.oransc.policyagent.configuration.RicConfig; -import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; @@ -67,7 +68,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Regularly refreshes the configuration from Consul. + * Regularly refreshes the configuration from Consul or from a local configuration file. */ @Component public class RefreshConfigTask { @@ -77,7 +78,13 @@ public class RefreshConfigTask { @Value("#{systemEnvironment}") public Properties systemEnvironment; + /** + * The time between refreshes of the configuration. + */ + static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); + final ApplicationConfig appConfig; + @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; private boolean isConsulUsed = false; @@ -86,8 +93,6 @@ public class RefreshConfigTask { private final Policies policies; private final Services services; private final PolicyTypes policyTypes; - private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); - private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); @Autowired public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services, @@ -105,80 +110,89 @@ public class RefreshConfigTask { stop(); refreshTask = createRefreshTask() // .subscribe(notUsed -> logger.debug("Refreshed configuration data"), - throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()), () -> logger.error("Configuration refresh terminated")); } public void stop() { if (refreshTask != null) { refreshTask.dispose(); - refreshTask = null; } } - Flux createRefreshTask() { - Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // - .filter(notUsed -> configFileExists()) // + Flux createRefreshTask() { + Flux loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // - .onErrorResume(this::ignoreError) // - .doOnNext(json -> logger.debug("loadFromFile")) // + .onErrorResume(this::ignoreErrorFlux) // + .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux loadFromConsul = getEnvironment(systemEnvironment) // + Flux loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) // + .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // - .flatMapMany(this::periodicConfigurationUpdates) // - .onErrorResume(this::ignoreError) // - .doOnNext(json -> logger.debug("loadFromConsul")) // + .flatMap(this::getFromCbs) // + .onErrorResume(this::ignoreErrorMono) // + .doOnNext(json -> logger.debug("loadFromConsul succeeded")) // .doOnNext(json -> this.isConsulUsed = true) // - .doOnTerminate(() -> logger.error("loadFromConsul Terminate")); + .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig) // .doOnNext(this::handleUpdatedRicConfig) // - .flatMap(configUpdate -> Flux.just(this.appConfig)) // + .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } Mono getEnvironment(Properties systemEnvironment) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // + .onErrorResume(t -> Mono.empty()); } Mono createCbsClient(EnvProperties env) { - return CbsClientFactory.createCbsClient(env); + return CbsClientFactory.createCbsClient(env) // + .onErrorResume(this::ignoreErrorMono); + } + + private Mono getFromCbs(CbsClient cbsClient) { + try { + final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); + return cbsClient.get(getConfigRequest) // + .onErrorResume(this::ignoreErrorMono); + } catch (Exception e) { + return ignoreErrorMono(e); + } } - private Flux periodicConfigurationUpdates(CbsClient cbsClient) { - final Duration initialDelay = Duration.ZERO; - final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL); + private Flux ignoreErrorFlux(Throwable throwable) { + String errMsg = throwable.toString(); + logger.warn("Could not refresh application configuration. {}", errMsg); + return Flux.empty(); } - private Mono ignoreError(Throwable throwable) { + private Mono ignoreErrorMono(Throwable throwable) { String errMsg = throwable.toString(); logger.warn("Could not refresh application configuration. {}", errMsg); return Mono.empty(); } - private Mono parseConfiguration(JsonObject jsonObject) { + private Mono parseConfiguration(JsonObject jsonObject) { try { ApplicationConfigParser parser = new ApplicationConfigParser(); - parser.parse(jsonObject); - return Mono.just(parser); - } catch (ServiceException e) { - logger.error("Could not parse configuration {}", e.toString(), e); - return Mono.error(e); + return Mono.just(parser.parse(jsonObject)); + } catch (Exception e) { + String str = e.toString(); + logger.error("Could not parse configuration {}", str); + return Mono.empty(); } } - private Flux updateConfig(ApplicationConfigParser config) { - return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(), - config.getDmaapConsumerConfig()); + private Flux updateConfig(ApplicationConfigParser.ConfigParserResult config) { + return this.appConfig.setConfiguration(config); } - boolean configFileExists() { - String filepath = appConfig.getLocalConfigurationFilePath(); + boolean fileExists(String filepath) { return (filepath != null && (new File(filepath).exists())); } @@ -194,7 +208,7 @@ public class RefreshConfigTask { } else if (event == RicConfigUpdate.Type.CHANGED) { Ric ric = this.rics.get(ricName); if (ric == null) { - // Should not happend,just for robustness + // Should not happen,just for robustness addRic(updatedInfo.getRicConfig()); } else { ric.setRicConfig(updatedInfo.getRicConfig()); @@ -220,6 +234,10 @@ public class RefreshConfigTask { */ Flux loadConfigurationFromFile() { String filepath = appConfig.getLocalConfigurationFilePath(); + if (!fileExists(filepath)) { + return Flux.empty(); + } + GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); @@ -229,8 +247,8 @@ public class RefreshConfigTask { appParser.parse(rootObject); logger.debug("Local configuration file loaded: {}", filepath); return Flux.just(rootObject); - } catch (JsonSyntaxException | ServiceException | IOException e) { - logger.debug("Local configuration file not loaded: {}", filepath, e); + } catch (Exception e) { + logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage()); return Flux.empty(); } }