Simplified startup
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RefreshConfigTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import com.google.gson.GsonBuilder;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import com.google.gson.JsonSyntaxException;
28 import com.google.gson.TypeAdapterFactory;
29
30 import java.io.BufferedInputStream;
31 import java.io.File;
32 import java.io.FileInputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.time.Duration;
37 import java.util.Properties;
38 import java.util.ServiceLoader;
39
40 import javax.validation.constraints.NotNull;
41
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
47 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
48 import org.oransc.policyagent.clients.A1ClientFactory;
49 import org.oransc.policyagent.configuration.ApplicationConfig;
50 import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
51 import org.oransc.policyagent.configuration.ApplicationConfigParser;
52 import org.oransc.policyagent.configuration.RicConfig;
53 import org.oransc.policyagent.exceptions.ServiceException;
54 import org.oransc.policyagent.repository.Policies;
55 import org.oransc.policyagent.repository.PolicyTypes;
56 import org.oransc.policyagent.repository.Ric;
57 import org.oransc.policyagent.repository.Rics;
58 import org.oransc.policyagent.repository.Services;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import org.springframework.beans.factory.annotation.Autowired;
62 import org.springframework.beans.factory.annotation.Value;
63 import org.springframework.stereotype.Component;
64
65 import reactor.core.Disposable;
66 import reactor.core.publisher.Flux;
67 import reactor.core.publisher.Mono;
68
69 /**
70  * Regularly refreshes the configuration from Consul.
71  */
72 @Component
73 public class RefreshConfigTask {
74
75     private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
76
77     @Value("#{systemEnvironment}")
78     public Properties systemEnvironment;
79
80     final ApplicationConfig appConfig;
81     private Disposable refreshTask = null;
82     private boolean isConsulUsed = false;
83
84     private final Rics rics;
85     private final A1ClientFactory a1ClientFactory;
86     private final Policies policies;
87     private final Services services;
88     private final PolicyTypes policyTypes;
89     private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
90     private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
91
92     @Autowired
93     public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
94         PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) {
95         this.appConfig = appConfig;
96         this.rics = rics;
97         this.policies = policies;
98         this.services = services;
99         this.policyTypes = policyTypes;
100         this.a1ClientFactory = a1ClientFactory;
101     }
102
103     public void start() {
104         logger.debug("Starting refreshConfigTask");
105         stop();
106         refreshTask = createRefreshTask() //
107             .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
108                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
109                 () -> logger.error("Configuration refresh terminated"));
110     }
111
112     public void stop() {
113         if (refreshTask != null) {
114             refreshTask.dispose();
115             refreshTask = null;
116         }
117     }
118
119     Flux<ApplicationConfig> createRefreshTask() {
120         Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
121             .filter(notUsed -> configFileExists()) //
122             .filter(notUsed -> !this.isConsulUsed) //
123             .flatMap(notUsed -> loadConfigurationFromFile()) //
124             .onErrorResume(this::ignoreError) //
125             .doOnNext(json -> logger.debug("loadFromFile")) //
126             .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
127
128         Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
129             .flatMap(this::createCbsClient) //
130             .flatMapMany(this::periodicConfigurationUpdates) //
131             .onErrorResume(this::ignoreError) //
132             .doOnNext(json -> logger.debug("loadFromConsul")) //
133             .doOnNext(json -> this.isConsulUsed = true) //
134             .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
135
136         return Flux.merge(loadFromFile, loadFromConsul) //
137             .flatMap(this::parseConfiguration) //
138             .flatMap(this::updateConfig) //
139             .doOnNext(this::handleUpdatedRicConfig) //
140             .flatMap(configUpdate -> Flux.just(this.appConfig)) //
141             .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
142     }
143
144     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
145         return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
146     }
147
148     Mono<CbsClient> createCbsClient(EnvProperties env) {
149         return CbsClientFactory.createCbsClient(env);
150     }
151
152     private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
153         final Duration initialDelay = Duration.ZERO;
154         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
155         return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
156     }
157
158     private <R> Mono<R> ignoreError(Throwable throwable) {
159         String errMsg = throwable.toString();
160         logger.warn("Could not refresh application configuration. {}", errMsg);
161         return Mono.empty();
162     }
163
164     private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
165         try {
166             ApplicationConfigParser parser = new ApplicationConfigParser();
167             parser.parse(jsonObject);
168             return Mono.just(parser);
169         } catch (ServiceException e) {
170             logger.error("Could not parse configuration {}", e.toString(), e);
171             return Mono.error(e);
172         }
173     }
174
175     private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
176         return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
177             config.getDmaapConsumerConfig());
178     }
179
180     boolean configFileExists() {
181         String filepath = appConfig.getLocalConfigurationFilePath();
182         return (filepath != null && (new File(filepath).exists()));
183     }
184
185     private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
186         synchronized (this.rics) {
187             String ricName = updatedInfo.getRicConfig().name();
188             RicConfigUpdate.Type event = updatedInfo.getType();
189             if (event == RicConfigUpdate.Type.ADDED) {
190                 addRic(updatedInfo.getRicConfig());
191             } else if (event == RicConfigUpdate.Type.REMOVED) {
192                 rics.remove(ricName);
193                 this.policies.removePoliciesForRic(ricName);
194             } else if (event == RicConfigUpdate.Type.CHANGED) {
195                 Ric ric = this.rics.get(ricName);
196                 if (ric == null) {
197                     // Should not happend,just for robustness
198                     addRic(updatedInfo.getRicConfig());
199                 } else {
200                     ric.setRicConfig(updatedInfo.getRicConfig());
201                 }
202             }
203         }
204     }
205
206     private void addRic(RicConfig config) {
207         Ric ric = new Ric(config);
208         this.rics.put(ric);
209         runRicSynchronization(ric);
210     }
211
212     void runRicSynchronization(Ric ric) {
213         RicSynchronizationTask synchronizationTask =
214             new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
215         synchronizationTask.run(ric);
216     }
217
218     /**
219      * Reads the configuration from file.
220      */
221     Flux<JsonObject> loadConfigurationFromFile() {
222         String filepath = appConfig.getLocalConfigurationFilePath();
223         GsonBuilder gsonBuilder = new GsonBuilder();
224         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
225
226         try (InputStream inputStream = createInputStream(filepath)) {
227             JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
228             ApplicationConfigParser appParser = new ApplicationConfigParser();
229             appParser.parse(rootObject);
230             logger.debug("Local configuration file loaded: {}", filepath);
231             return Flux.just(rootObject);
232         } catch (JsonSyntaxException | ServiceException | IOException e) {
233             logger.debug("Local configuration file not loaded: {}", filepath, e);
234             return Flux.empty();
235         }
236     }
237
238     JsonElement getJsonElement(InputStream inputStream) {
239         return JsonParser.parseReader(new InputStreamReader(inputStream));
240     }
241
242     InputStream createInputStream(@NotNull String filepath) throws IOException {
243         return new BufferedInputStream(new FileInputStream(filepath));
244     }
245 }