2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.tasks;
23 import com.google.gson.GsonBuilder;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import com.google.gson.JsonSyntaxException;
28 import com.google.gson.TypeAdapterFactory;
30 import java.io.BufferedInputStream;
31 import java.io.FileInputStream;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.time.Duration;
36 import java.util.Properties;
37 import java.util.ServiceLoader;
39 import javax.validation.constraints.NotNull;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
46 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
47 import org.oransc.policyagent.configuration.ApplicationConfig;
48 import org.oransc.policyagent.configuration.ApplicationConfigParser;
49 import org.oransc.policyagent.exceptions.ServiceException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.springframework.beans.factory.annotation.Autowired;
53 import org.springframework.beans.factory.annotation.Value;
54 import org.springframework.stereotype.Component;
56 import reactor.core.Disposable;
57 import reactor.core.publisher.Flux;
58 import reactor.core.publisher.Mono;
61 * Regularly refreshes the configuration from Consul.
64 public class RefreshConfigTask {
66 private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
68 @Value("#{systemEnvironment}")
69 public Properties systemEnvironment;
71 private final ApplicationConfig appConfig;
72 private Disposable refreshTask = null;
75 public RefreshConfigTask(ApplicationConfig appConfig) {
76 this.appConfig = appConfig;
80 logger.debug("Starting refreshConfigTask");
82 loadConfigurationFromFile();
83 refreshTask = createRefreshTask() //
84 .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
85 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
86 () -> logger.debug("Configuration refresh completed"));
90 if (refreshTask != null) {
91 refreshTask.dispose();
96 Flux<ApplicationConfig> createRefreshTask() {
97 return getEnvironment(systemEnvironment) //
98 .flatMap(this::createCbsClient) //
99 .flatMapMany(this::periodicConfigurationUpdates) //
100 .map(this::parseRicConfigurationfromConsul) //
101 .onErrorResume(this::onErrorResume);
104 Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
105 return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
108 Mono<CbsClient> createCbsClient(EnvProperties env) {
109 return CbsClientFactory.createCbsClient(env);
112 private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
113 final Duration initialDelay = Duration.ZERO;
114 final Duration refreshPeriod = Duration.ofMinutes(1);
115 final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
116 return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
119 private <R> Mono<R> onErrorResume(Throwable throwable) {
120 String errMsg = throwable.toString();
121 logger.error("Could not refresh application configuration. {}", errMsg);
125 private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
127 ApplicationConfigParser parser = new ApplicationConfigParser();
128 parser.parse(jsonObject);
129 this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
130 parser.getDmaapConsumerConfig());
131 } catch (ServiceException e) {
132 logger.error("Could not parse configuration {}", e.toString(), e);
134 return this.appConfig;
138 * Reads the configuration from file.
140 void loadConfigurationFromFile() {
141 String filepath = appConfig.getLocalConfigurationFilePath();
142 if (filepath == null) {
143 logger.debug("No localconfiguration file used");
146 GsonBuilder gsonBuilder = new GsonBuilder();
147 ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
149 try (InputStream inputStream = createInputStream(filepath)) {
150 JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
151 ApplicationConfigParser appParser = new ApplicationConfigParser();
152 appParser.parse(rootObject);
153 appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
154 appParser.getDmaapConsumerConfig());
155 logger.info("Local configuration file loaded: {}", filepath);
156 } catch (JsonSyntaxException | ServiceException | IOException e) {
157 logger.trace("Local configuration file not loaded: {}", filepath, e);
161 JsonElement getJsonElement(InputStream inputStream) {
162 return JsonParser.parseReader(new InputStreamReader(inputStream));
165 InputStream createInputStream(@NotNull String filepath) throws IOException {
166 return new BufferedInputStream(new FileInputStream(filepath));