X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRefreshConfigTask.java;h=7cfe4868ee69e277b2ccd5184c003bc0f3c76146;hb=d375c04a9c1917a9bf98285fb1f30062cff7a6d6;hp=81735d7242e33fedf2d33cc56ef13dc0ff22b99d;hpb=cda38f944b12cd0ea8865a6f427a800f28991982;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 81735d72..7cfe4868 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -39,6 +39,9 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotNull; +import lombok.AccessLevel; +import lombok.Getter; + import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; @@ -67,7 +70,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Regularly refreshes the configuration from Consul. + * Regularly refreshes the configuration from Consul or from a local + * configuration file. */ @Component public class RefreshConfigTask { @@ -78,6 +82,7 @@ public class RefreshConfigTask { public Properties systemEnvironment; final ApplicationConfig appConfig; + @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; private boolean isConsulUsed = false; @@ -104,40 +109,40 @@ public class RefreshConfigTask { logger.debug("Starting refreshConfigTask"); stop(); refreshTask = createRefreshTask() // - .subscribe(notUsed -> logger.debug("Refreshed configuration data"), - throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + .subscribe( + notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger + .error("Configuration refresh terminated due to exception {}", throwable.getMessage()), () -> logger.error("Configuration refresh terminated")); } public void stop() { if (refreshTask != null) { refreshTask.dispose(); - refreshTask = null; } } - Flux createRefreshTask() { + Flux createRefreshTask() { Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // .filter(notUsed -> configFileExists()) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // .onErrorResume(this::ignoreError) // - .doOnNext(json -> logger.debug("loadFromFile")) // + .doOnNext(json -> logger.debug("loadFromFile succeeded")) // .doOnTerminate(() -> logger.error("loadFromFile Terminate")); Flux loadFromConsul = getEnvironment(systemEnvironment) // .flatMap(this::createCbsClient) // .flatMapMany(this::periodicConfigurationUpdates) // .onErrorResume(this::ignoreError) // - .doOnNext(json -> logger.debug("loadFromConsul")) // + .doOnNext(json -> logger.debug("loadFromConsul succeeded")) // .doOnNext(json -> this.isConsulUsed = true) // - .doOnTerminate(() -> logger.error("loadFromConsul Terminate")); + .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig) // .doOnNext(this::handleUpdatedRicConfig) // - .flatMap(configUpdate -> Flux.just(this.appConfig)) // + .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } @@ -161,20 +166,18 @@ public class RefreshConfigTask { return Mono.empty(); } - private Mono parseConfiguration(JsonObject jsonObject) { + private Mono parseConfiguration(JsonObject jsonObject) { try { ApplicationConfigParser parser = new ApplicationConfigParser(); - parser.parse(jsonObject); - return Mono.just(parser); + return Mono.just(parser.parse(jsonObject)); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); return Mono.error(e); } } - private Flux updateConfig(ApplicationConfigParser config) { - return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(), - config.getDmaapConsumerConfig()); + private Flux updateConfig(ApplicationConfigParser.ConfigParserResult config) { + return this.appConfig.setConfiguration(config); } boolean configFileExists() { @@ -194,7 +197,7 @@ public class RefreshConfigTask { } else if (event == RicConfigUpdate.Type.CHANGED) { Ric ric = this.rics.get(ricName); if (ric == null) { - // Should not happend,just for robustness + // Should not happen,just for robustness addRic(updatedInfo.getRicConfig()); } else { ric.setRicConfig(updatedInfo.getRicConfig());