+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import javax.validation.constraints.NotNull;
+
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.configuration.ApplicationConfigParser;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Regularly refreshes the configuration from Consul.
+ */
+@Component
+public class RefreshConfigTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
+
+ @Value("#{systemEnvironment}")
+ public Properties systemEnvironment;
+
+ private final ApplicationConfig appConfig;
+ private Disposable refreshTask = null;
+
+ @Autowired
+ public RefreshConfigTask(ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ public void start() {
+ logger.debug("Starting refreshConfigTask");
+ stop();
+ loadConfigurationFromFile();
+ refreshTask = createRefreshTask() //
+ .subscribe(notUsed -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshTask != null) {
+ refreshTask.dispose();
+ refreshTask = null;
+ }
+ }
+
+ Flux<ApplicationConfig> createRefreshTask() {
+ return getEnvironment(systemEnvironment) //
+ .flatMap(this::createCbsClient) //
+ .flatMapMany(this::periodicConfigurationUpdates) //
+ .map(this::parseRicConfigurationfromConsul) //
+ .onErrorResume(this::onErrorResume);
+ }
+
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+ }
+
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+ try {
+ ApplicationConfigParser parser = new ApplicationConfigParser();
+ parser.parse(jsonObject);
+ this.appConfig.setConfiguration(parser.getRicConfigs());
+ } catch (ServiceException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this.appConfig;
+ }
+
+ /**
+ * Reads the configuration from file.
+ */
+ public void loadConfigurationFromFile() {
+ String filepath = appConfig.getLocalConfigurationFilePath();
+ if (filepath == null) {
+ logger.debug("No localconfiguration file used");
+ return;
+ }
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+
+ try (InputStream inputStream = createInputStream(filepath)) {
+ JsonParser parser = new JsonParser();
+ JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ if (rootObject == null) {
+ throw new JsonSyntaxException("Root is not a json object");
+ }
+ ApplicationConfigParser appParser = new ApplicationConfigParser();
+ appParser.parse(rootObject);
+ appConfig.setConfiguration(appParser.getRicConfigs());
+ logger.info("Local configuration file loaded: {}", filepath);
+ } catch (JsonSyntaxException | ServiceException | IOException e) {
+ logger.trace("Local configuration file not loaded: {}", filepath, e);
+ }
+ }
+
+ JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
+ return parser.parse(new InputStreamReader(inputStream));
+ }
+
+ InputStream createInputStream(@NotNull String filepath) throws IOException {
+ return new BufferedInputStream(new FileInputStream(filepath));
+ }
+}