2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.tasks;
23 import com.google.gson.GsonBuilder;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import com.google.gson.JsonSyntaxException;
28 import com.google.gson.TypeAdapterFactory;
30 import java.io.BufferedInputStream;
31 import java.io.FileInputStream;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.time.Duration;
36 import java.util.Properties;
37 import java.util.ServiceLoader;
39 import javax.validation.constraints.NotNull;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
46 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
47 import org.oransc.policyagent.configuration.ApplicationConfig;
48 import org.oransc.policyagent.configuration.ApplicationConfigParser;
49 import org.oransc.policyagent.exceptions.ServiceException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.springframework.beans.factory.annotation.Autowired;
53 import org.springframework.beans.factory.annotation.Value;
54 import org.springframework.stereotype.Component;
56 import reactor.core.Disposable;
57 import reactor.core.publisher.Flux;
58 import reactor.core.publisher.Mono;
61 * Regularly refreshes the configuration from Consul.
64 public class RefreshConfigTask {
66 private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
68 @Value("#{systemEnvironment}")
69 public Properties systemEnvironment;
71 private final ApplicationConfig appConfig;
72 private Disposable refreshTask = null;
75 public RefreshConfigTask(ApplicationConfig appConfig) {
76 this.appConfig = appConfig;
80 logger.debug("Starting refreshConfigTask");
82 loadConfigurationFromFile();
83 refreshTask = createRefreshTask() //
84 .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
85 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
86 () -> logger.debug("Configuration refresh completed"));
90 if (refreshTask != null) {
91 refreshTask.dispose();
96 Flux<ApplicationConfig> createRefreshTask() {
97 return getEnvironment(systemEnvironment) //
98 .flatMap(this::createCbsClient) //
99 .flatMapMany(this::periodicConfigurationUpdates) //
100 .map(this::parseRicConfigurationfromConsul) //
101 .onErrorResume(this::onErrorResume);
104 Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
105 return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
108 Mono<CbsClient> createCbsClient(EnvProperties env) {
109 return CbsClientFactory.createCbsClient(env);
112 private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
113 final Duration initialDelay = Duration.ZERO;
114 final Duration refreshPeriod = Duration.ofMinutes(1);
115 final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
116 return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
119 private <R> Mono<R> onErrorResume(Throwable trowable) {
120 logger.error("Could not refresh application configuration {}", trowable.toString());
124 private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
126 ApplicationConfigParser parser = new ApplicationConfigParser();
127 parser.parse(jsonObject);
128 this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
129 parser.getDmaapConsumerConfig());
130 } catch (ServiceException e) {
131 logger.error("Could not parse configuration {}", e.toString(), e);
133 return this.appConfig;
137 * Reads the configuration from file.
139 public void loadConfigurationFromFile() {
140 String filepath = appConfig.getLocalConfigurationFilePath();
141 if (filepath == null) {
142 logger.debug("No localconfiguration file used");
145 GsonBuilder gsonBuilder = new GsonBuilder();
146 ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
148 try (InputStream inputStream = createInputStream(filepath)) {
149 JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
150 if (rootObject == null) {
151 throw new JsonSyntaxException("Root is not a json object");
153 ApplicationConfigParser appParser = new ApplicationConfigParser();
154 appParser.parse(rootObject);
155 appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
156 appParser.getDmaapConsumerConfig());
157 logger.info("Local configuration file loaded: {}", filepath);
158 } catch (JsonSyntaxException | ServiceException | IOException e) {
159 logger.trace("Local configuration file not loaded: {}", filepath, e);
163 JsonElement getJsonElement(InputStream inputStream) {
164 return JsonParser.parseReader(new InputStreamReader(inputStream));
167 InputStream createInputStream(@NotNull String filepath) throws IOException {
168 return new BufferedInputStream(new FileInputStream(filepath));