X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fconfiguration%2FApplicationConfig.java;h=e41f55eb1a7d6a2ad16b598e1e957c69b798b8aa;hb=bbc4aa48f60dc70154ff07c2143918053f1c9154;hp=d8da5ee89af883713cbce391a4e4907d6d1f4d82;hpb=3ed724aaef96d30347f9a60ca111627b28133721;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index d8da5ee8..e41f55eb 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -32,6 +32,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.time.Duration; import java.util.Optional; import java.util.Properties; import java.util.ServiceLoader; @@ -40,6 +41,12 @@ import java.util.Vector; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.oransc.policyagent.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +54,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @EnableConfigurationProperties @@ -57,6 +67,8 @@ public class ApplicationConfig { @Value("#{systemEnvironment}") Properties systemEnvironment; + private Disposable refreshConfigTask = null; + @NotEmpty private String filepath; @@ -93,11 +105,64 @@ public class ApplicationConfig { } public void initialize() { + stop(); loadConfigurationFromFile(this.filepath); + + refreshConfigTask = createRefreshTask() // + .subscribe(e -> logger.info("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + () -> logger.error("Configuration refresh terminated")); + } + + Mono getEnvironment(Properties systemEnvironment) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); + } + + Flux createRefreshTask() { + return getEnvironment(systemEnvironment) // + .flatMap(this::createCbsClient) // + .flatMapMany(this::periodicConfigurationUpdates) // + .map(this::parseRicConfigurationfromConsul) // + .onErrorResume(this::onErrorResume); + } + + Mono createCbsClient(EnvProperties env) { + return CbsClientFactory.createCbsClient(env); + } + + private Flux periodicConfigurationUpdates(CbsClient cbsClient) { + final Duration initialDelay = Duration.ZERO; + final Duration refreshPeriod = Duration.ofMinutes(1); + final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); + return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); } - Mono getEnvironment(Properties systemEnvironment) { - return Environment.readEnvironmentVariables(systemEnvironment); + private Mono onErrorResume(Throwable trowable) { + logger.error("Could not refresh application configuration {}", trowable.toString()); + return Mono.empty(); + } + + private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) { + try { + ApplicationConfigParser parser = new ApplicationConfigParser(); + parser.parse(jsonObject); + setConfiguration(parser.getRicConfigs()); + + } catch (ServiceException e) { + logger.error("Could not parse configuration {}", e.toString(), e); + } + return this; + } + + private synchronized void setConfiguration(@NotNull Vector ricConfigs) { + this.ricConfigs = ricConfigs; + } + + public void stop() { + if (refreshConfigTask != null) { + refreshConfigTask.dispose(); + refreshConfigTask = null; + } } /** @@ -129,5 +194,4 @@ public class ApplicationConfig { InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } - }