7e5228425c957d124e0c3b44a1a6c3078f60c784
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / configuration / ApplicationConfig.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.configuration;
22
23 import com.google.gson.GsonBuilder;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import com.google.gson.JsonSyntaxException;
28 import com.google.gson.TypeAdapterFactory;
29
30 import java.io.BufferedInputStream;
31 import java.io.FileInputStream;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.time.Duration;
36 import java.util.Collection;
37 import java.util.HashMap;
38 import java.util.Map;
39 import java.util.Optional;
40 import java.util.Properties;
41 import java.util.ServiceLoader;
42 import java.util.Vector;
43
44 import javax.validation.constraints.NotEmpty;
45 import javax.validation.constraints.NotNull;
46
47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
48 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
49 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
50 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
51 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
52 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
53 import org.oransc.policyagent.exceptions.ServiceException;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.springframework.beans.factory.annotation.Autowired;
57 import org.springframework.beans.factory.annotation.Value;
58 import org.springframework.boot.context.properties.ConfigurationProperties;
59 import org.springframework.boot.context.properties.EnableConfigurationProperties;
60
61 import reactor.core.Disposable;
62 import reactor.core.publisher.Flux;
63 import reactor.core.publisher.Mono;
64
65 @EnableConfigurationProperties
66 @ConfigurationProperties("app")
67 public class ApplicationConfig {
68     private static final Logger logger = LoggerFactory.getLogger(ApplicationConfig.class);
69
70     @Value("#{systemEnvironment}")
71     Properties systemEnvironment;
72
73     private Disposable refreshConfigTask = null;
74     private Collection<Observer> observers = new Vector<>();
75
76     private Map<String, RicConfig> ricConfigs = new HashMap<>();
77
78     @NotEmpty
79     private String filepath;
80
81     @Autowired
82     public ApplicationConfig() {
83     }
84
85     protected String getLocalConfigurationFilePath() {
86         return this.filepath;
87     }
88
89     public synchronized void setFilepath(String filepath) {
90         this.filepath = filepath;
91     }
92
93     public synchronized Collection<RicConfig> getRicConfigs() {
94         return this.ricConfigs.values();
95     }
96
97     public synchronized Optional<RicConfig> lookupRicConfigForManagedElement(String managedElementId) {
98         for (RicConfig ricConfig : getRicConfigs()) {
99             if (ricConfig.managedElementIds().contains(managedElementId)) {
100                 return Optional.of(ricConfig);
101             }
102         }
103         return Optional.empty();
104     }
105
106     public RicConfig getRic(String ricName) throws ServiceException {
107         for (RicConfig ricConfig : getRicConfigs()) {
108             if (ricConfig.name().equals(ricName)) {
109                 return ricConfig;
110             }
111         }
112         throw new ServiceException("Could not find ric: " + ricName);
113     }
114
115     public void initialize() {
116         stop();
117         loadConfigurationFromFile();
118
119         refreshConfigTask = createRefreshTask() //
120             .subscribe(notUsed -> logger.info("Refreshed configuration data"),
121                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
122                 () -> logger.error("Configuration refresh terminated"));
123     }
124
125     public static enum RicConfigUpdate {
126         ADDED, CHANGED, REMOVED
127     }
128
129     public interface Observer {
130         void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
131     }
132
133     public void addObserver(Observer o) {
134         this.observers.add(o);
135     }
136
137     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
138         return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
139     }
140
141     Flux<ApplicationConfig> createRefreshTask() {
142         return getEnvironment(systemEnvironment) //
143             .flatMap(this::createCbsClient) //
144             .flatMapMany(this::periodicConfigurationUpdates) //
145             .map(this::parseRicConfigurationfromConsul) //
146             .onErrorResume(this::onErrorResume);
147     }
148
149     Mono<CbsClient> createCbsClient(EnvProperties env) {
150         return CbsClientFactory.createCbsClient(env);
151     }
152
153     private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
154         final Duration initialDelay = Duration.ZERO;
155         final Duration refreshPeriod = Duration.ofMinutes(1);
156         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
157         return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
158     }
159
160     private <R> Mono<R> onErrorResume(Throwable trowable) {
161         logger.error("Could not refresh application configuration {}", trowable.toString());
162         return Mono.empty();
163     }
164
165     private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
166         try {
167             ApplicationConfigParser parser = new ApplicationConfigParser();
168             parser.parse(jsonObject);
169             setConfiguration(parser.getRicConfigs());
170
171         } catch (ServiceException e) {
172             logger.error("Could not parse configuration {}", e.toString(), e);
173         }
174         return this;
175     }
176
177     private class Notification {
178         final RicConfig ric;
179         final RicConfigUpdate event;
180
181         Notification(RicConfig ric, RicConfigUpdate event) {
182             this.ric = ric;
183             this.event = event;
184         }
185     }
186
187     private void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
188         Collection<Notification> notifications = new Vector<>();
189         synchronized (this) {
190             Map<String, RicConfig> newRicConfigs = new HashMap<>();
191             for (RicConfig newConfig : ricConfigs) {
192                 RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
193                 if (oldConfig == null) {
194                     newRicConfigs.put(newConfig.name(), newConfig);
195                     notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
196                     this.ricConfigs.remove(newConfig.name());
197                 } else if (!newConfig.equals(newConfig)) {
198                     notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
199                     newRicConfigs.put(newConfig.name(), newConfig);
200                     this.ricConfigs.remove(newConfig.name());
201                 } else {
202                     newRicConfigs.put(oldConfig.name(), oldConfig);
203                 }
204             }
205             for (RicConfig deletedConfig : this.ricConfigs.values()) {
206                 notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
207             }
208             this.ricConfigs = newRicConfigs;
209         }
210         notifyObservers(notifications);
211     }
212
213     private void notifyObservers(Collection<Notification> notifications) {
214         for (Observer observer : this.observers) {
215             for (Notification notif : notifications) {
216                 observer.onRicConfigUpdate(notif.ric, notif.event);
217             }
218         }
219     }
220
221     public void stop() {
222         if (refreshConfigTask != null) {
223             refreshConfigTask.dispose();
224             refreshConfigTask = null;
225         }
226     }
227
228     /**
229      * Reads the configuration from file.
230      */
231     public void loadConfigurationFromFile() {
232         String filepath = getLocalConfigurationFilePath();
233         if (filepath == null) {
234             logger.debug("No localconfiguration file used");
235             return;
236         }
237         GsonBuilder gsonBuilder = new GsonBuilder();
238         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
239
240         try (InputStream inputStream = createInputStream(filepath)) {
241             JsonParser parser = new JsonParser();
242             JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
243             if (rootObject == null) {
244                 throw new JsonSyntaxException("Root is not a json object");
245             }
246             ApplicationConfigParser appParser = new ApplicationConfigParser();
247             appParser.parse(rootObject);
248             setConfiguration(appParser.getRicConfigs());
249             logger.info("Local configuration file loaded: {}", filepath);
250         } catch (JsonSyntaxException | ServiceException | IOException e) {
251             logger.trace("Local configuration file not loaded: {}", filepath, e);
252         }
253     }
254
255     JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
256         return parser.parse(new InputStreamReader(inputStream));
257     }
258
259     InputStream createInputStream(@NotNull String filepath) throws IOException {
260         return new BufferedInputStream(new FileInputStream(filepath));
261     }
262 }