/*- * ========================LICENSE_START================================= * O-RAN-SC * %% * Copyright (C) 2019 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ========================LICENSE_END=================================== */ package org.oransc.policyagent.tasks; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.time.Duration; import java.util.Properties; import java.util.ServiceLoader; import javax.validation.constraints.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ApplicationConfigParser; import org.oransc.policyagent.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * Regularly refreshes the configuration from Consul. */ @Component public class RefreshConfigTask { private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class); @Value("#{systemEnvironment}") public Properties systemEnvironment; private final ApplicationConfig appConfig; private Disposable refreshTask = null; @Autowired public RefreshConfigTask(ApplicationConfig appConfig) { this.appConfig = appConfig; } public void start() { logger.debug("Starting refreshConfigTask"); stop(); loadConfigurationFromFile(); refreshTask = createRefreshTask() // .subscribe(notUsed -> logger.info("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } public void stop() { if (refreshTask != null) { refreshTask.dispose(); refreshTask = null; } } Flux createRefreshTask() { return getEnvironment(systemEnvironment) // .flatMap(this::createCbsClient) // .flatMapMany(this::periodicConfigurationUpdates) // .map(this::parseRicConfigurationfromConsul) // .onErrorResume(this::onErrorResume); } Mono getEnvironment(Properties systemEnvironment) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); } Mono createCbsClient(EnvProperties env) { return CbsClientFactory.createCbsClient(env); } private Flux periodicConfigurationUpdates(CbsClient cbsClient) { final Duration initialDelay = Duration.ZERO; final Duration refreshPeriod = Duration.ofMinutes(1); final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); } private Mono onErrorResume(Throwable trowable) { logger.error("Could not refresh application configuration {}", trowable.toString()); return Mono.empty(); } private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(), parser.getDmaapConsumerConfig()); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); } return this.appConfig; } /** * Reads the configuration from file. */ public void loadConfigurationFromFile() { String filepath = appConfig.getLocalConfigurationFilePath(); if (filepath == null) { logger.debug("No localconfiguration file used"); return; } GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); try (InputStream inputStream = createInputStream(filepath)) { JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); if (rootObject == null) { throw new JsonSyntaxException("Root is not a json object"); } ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(), appParser.getDmaapConsumerConfig()); logger.info("Local configuration file loaded: {}", filepath); } catch (JsonSyntaxException | ServiceException | IOException e) { logger.trace("Local configuration file not loaded: {}", filepath, e); } } JsonElement getJsonElement(InputStream inputStream) { return JsonParser.parseReader(new InputStreamReader(inputStream)); } InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } }