X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRefreshConfigTask.java;h=de4a771c642deadb17321307e494ef8df9cf6288;hb=5606c08e8a5cc813a6a6eb09bacce123137e43e2;hp=1ab5fc9c5b46c842a82dbdf0751ed22cd77d0499;hpb=f3461cb776023b950d62edd25eca148b6d354c9c;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 1ab5fc9c..de4a771c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -38,15 +39,26 @@ import java.util.ServiceLoader; import javax.validation.constraints.NotNull; +import lombok.AccessLevel; +import lombok.Getter; + import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; +import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate; import org.oransc.policyagent.configuration.ApplicationConfigParser; +import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; +import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -68,20 +80,35 @@ public class RefreshConfigTask { @Value("#{systemEnvironment}") public Properties systemEnvironment; - private final ApplicationConfig appConfig; + final ApplicationConfig appConfig; + @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; + private boolean isConsulUsed = false; + + private final Rics rics; + private final A1ClientFactory a1ClientFactory; + private final Policies policies; + private final Services services; + private final PolicyTypes policyTypes; + private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); + private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); @Autowired - public RefreshConfigTask(ApplicationConfig appConfig) { + public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services, + PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) { this.appConfig = appConfig; + this.rics = rics; + this.policies = policies; + this.services = services; + this.policyTypes = policyTypes; + this.a1ClientFactory = a1ClientFactory; } public void start() { logger.debug("Starting refreshConfigTask"); stop(); - loadConfigurationFromFile(); refreshTask = createRefreshTask() // - .subscribe(notUsed -> logger.info("Refreshed configuration data"), + .subscribe(notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } @@ -89,16 +116,32 @@ public class RefreshConfigTask { public void stop() { if (refreshTask != null) { refreshTask.dispose(); - refreshTask = null; } } - Flux createRefreshTask() { - return getEnvironment(systemEnvironment) // + Flux createRefreshTask() { + Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // + .filter(notUsed -> configFileExists()) // + .filter(notUsed -> !this.isConsulUsed) // + .flatMap(notUsed -> loadConfigurationFromFile()) // + .onErrorResume(this::ignoreError) // + .doOnNext(json -> logger.debug("loadFromFile")) // + .doOnTerminate(() -> logger.error("loadFromFile Terminate")); + + Flux loadFromConsul = getEnvironment(systemEnvironment) // .flatMap(this::createCbsClient) // .flatMapMany(this::periodicConfigurationUpdates) // - .map(this::parseRicConfigurationfromConsul) // - .onErrorResume(this::onErrorResume); + .onErrorResume(this::ignoreError) // + .doOnNext(json -> logger.debug("loadFromConsul")) // + .doOnNext(json -> this.isConsulUsed = true) // + .doOnTerminate(() -> logger.error("loadFromConsul Terminate")); + + return Flux.merge(loadFromFile, loadFromConsul) // + .flatMap(this::parseConfiguration) // + .flatMap(this::updateConfig) // + .doOnNext(this::handleUpdatedRicConfig) // + .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // + .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } Mono getEnvironment(Properties systemEnvironment) { @@ -111,58 +154,92 @@ public class RefreshConfigTask { private Flux periodicConfigurationUpdates(CbsClient cbsClient) { final Duration initialDelay = Duration.ZERO; - final Duration refreshPeriod = Duration.ofMinutes(1); final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); + return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL); } - private Mono onErrorResume(Throwable trowable) { - logger.error("Could not refresh application configuration {}", trowable.toString()); + private Mono ignoreError(Throwable throwable) { + String errMsg = throwable.toString(); + logger.warn("Could not refresh application configuration. {}", errMsg); return Mono.empty(); } - private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) { + private Mono parseConfiguration(JsonObject jsonObject) { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); - this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(), - parser.getDmaapConsumerConfig()); + return Mono.just(parser); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); + return Mono.error(e); + } + } + + private Flux updateConfig(ApplicationConfigParser config) { + return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(), + config.getDmaapConsumerConfig()); + } + + boolean configFileExists() { + String filepath = appConfig.getLocalConfigurationFilePath(); + return (filepath != null && (new File(filepath).exists())); + } + + private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { + synchronized (this.rics) { + String ricName = updatedInfo.getRicConfig().name(); + RicConfigUpdate.Type event = updatedInfo.getType(); + if (event == RicConfigUpdate.Type.ADDED) { + addRic(updatedInfo.getRicConfig()); + } else if (event == RicConfigUpdate.Type.REMOVED) { + rics.remove(ricName); + this.policies.removePoliciesForRic(ricName); + } else if (event == RicConfigUpdate.Type.CHANGED) { + Ric ric = this.rics.get(ricName); + if (ric == null) { + // Should not happen,just for robustness + addRic(updatedInfo.getRicConfig()); + } else { + ric.setRicConfig(updatedInfo.getRicConfig()); + } + } } - return this.appConfig; + } + + private void addRic(RicConfig config) { + Ric ric = new Ric(config); + this.rics.put(ric); + runRicSynchronization(ric); + } + + void runRicSynchronization(Ric ric) { + RicSynchronizationTask synchronizationTask = + new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); + synchronizationTask.run(ric); } /** * Reads the configuration from file. */ - public void loadConfigurationFromFile() { + Flux loadConfigurationFromFile() { String filepath = appConfig.getLocalConfigurationFilePath(); - if (filepath == null) { - logger.debug("No localconfiguration file used"); - return; - } GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); try (InputStream inputStream = createInputStream(filepath)) { - JsonParser parser = new JsonParser(); - JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject(); - if (rootObject == null) { - throw new JsonSyntaxException("Root is not a json object"); - } + JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(), - appParser.getDmaapConsumerConfig()); - logger.info("Local configuration file loaded: {}", filepath); + logger.debug("Local configuration file loaded: {}", filepath); + return Flux.just(rootObject); } catch (JsonSyntaxException | ServiceException | IOException e) { - logger.trace("Local configuration file not loaded: {}", filepath, e); + logger.debug("Local configuration file not loaded: {}", filepath, e); + return Flux.empty(); } } - JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream)); + JsonElement getJsonElement(InputStream inputStream) { + return JsonParser.parseReader(new InputStreamReader(inputStream)); } InputStream createInputStream(@NotNull String filepath) throws IOException {