Remove sleep from DmaapMessageConsumer
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RefreshConfigTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import com.google.gson.GsonBuilder;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import com.google.gson.TypeAdapterFactory;
28
29 import java.io.BufferedInputStream;
30 import java.io.File;
31 import java.io.FileInputStream;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.time.Duration;
36 import java.util.Properties;
37 import java.util.ServiceLoader;
38
39 import javax.validation.constraints.NotNull;
40
41 import lombok.AccessLevel;
42 import lombok.Getter;
43
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
46 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
47 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
48 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
49 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
50 import org.oransc.policyagent.clients.A1ClientFactory;
51 import org.oransc.policyagent.configuration.ApplicationConfig;
52 import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
53 import org.oransc.policyagent.configuration.ApplicationConfigParser;
54 import org.oransc.policyagent.configuration.RicConfig;
55 import org.oransc.policyagent.exceptions.ServiceException;
56 import org.oransc.policyagent.repository.Policies;
57 import org.oransc.policyagent.repository.PolicyTypes;
58 import org.oransc.policyagent.repository.Ric;
59 import org.oransc.policyagent.repository.Rics;
60 import org.oransc.policyagent.repository.Services;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63 import org.springframework.beans.factory.annotation.Autowired;
64 import org.springframework.beans.factory.annotation.Value;
65 import org.springframework.stereotype.Component;
66
67 import reactor.core.Disposable;
68 import reactor.core.publisher.Flux;
69 import reactor.core.publisher.Mono;
70
71 /**
72  * Regularly refreshes the configuration from Consul or from a local
73  * configuration file.
74  */
75 @Component
76 public class RefreshConfigTask {
77
78     private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
79
80     @Value("#{systemEnvironment}")
81     public Properties systemEnvironment;
82
83     /**
84      * The time between refreshes of the configuration.
85      */
86     public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
87
88     final ApplicationConfig appConfig;
89     @Getter(AccessLevel.PROTECTED)
90     private Disposable refreshTask = null;
91     private boolean isConsulUsed = false;
92
93     private final Rics rics;
94     private final A1ClientFactory a1ClientFactory;
95     private final Policies policies;
96     private final Services services;
97     private final PolicyTypes policyTypes;
98
99     @Autowired
100     public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
101         PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) {
102         this.appConfig = appConfig;
103         this.rics = rics;
104         this.policies = policies;
105         this.services = services;
106         this.policyTypes = policyTypes;
107         this.a1ClientFactory = a1ClientFactory;
108     }
109
110     public void start() {
111         logger.debug("Starting refreshConfigTask");
112         stop();
113         refreshTask = createRefreshTask() //
114             .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
115                 throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()),
116                 () -> logger.error("Configuration refresh terminated"));
117     }
118
119     public void stop() {
120         if (refreshTask != null) {
121             refreshTask.dispose();
122         }
123     }
124
125     Flux<RicConfigUpdate.Type> createRefreshTask() {
126         Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
127             .filter(notUsed -> !this.isConsulUsed) //
128             .flatMap(notUsed -> loadConfigurationFromFile()) //
129             .onErrorResume(this::ignoreErrorFlux) //
130             .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
131             .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
132
133         Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
134             .flatMap(i -> getEnvironment(systemEnvironment)) //
135             .flatMap(this::createCbsClient) //
136             .flatMap(this::getFromCbs) //
137             .onErrorResume(this::ignoreErrorMono) //
138             .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
139             .doOnNext(json -> this.isConsulUsed = true) //
140             .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
141
142         return Flux.merge(loadFromFile, loadFromConsul) //
143             .flatMap(this::parseConfiguration) //
144             .flatMap(this::updateConfig) //
145             .doOnNext(this::handleUpdatedRicConfig) //
146             .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
147             .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
148     }
149
150     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
151         return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
152             .onErrorResume(t -> Mono.empty());
153     }
154
155     Mono<CbsClient> createCbsClient(EnvProperties env) {
156         return CbsClientFactory.createCbsClient(env) //
157             .onErrorResume(this::ignoreErrorMono);
158     }
159
160     private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
161         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
162         try {
163             return cbsClient.get(getConfigRequest) //
164                 .onErrorResume(this::ignoreErrorMono);
165         } catch (Exception e) {
166             return ignoreErrorMono(e);
167         }
168     }
169
170     private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
171         String errMsg = throwable.toString();
172         logger.warn("Could not refresh application configuration. {}", errMsg);
173         return Flux.empty();
174     }
175
176     private <R> Mono<R> ignoreErrorMono(Throwable throwable) {
177         String errMsg = throwable.toString();
178         logger.warn("Could not refresh application configuration. {}", errMsg);
179         return Mono.empty();
180     }
181
182     private Mono<ApplicationConfigParser.ConfigParserResult> parseConfiguration(JsonObject jsonObject) {
183         try {
184             ApplicationConfigParser parser = new ApplicationConfigParser();
185             return Mono.just(parser.parse(jsonObject));
186         } catch (Exception e) {
187             String str = e.toString();
188             logger.error("Could not parse configuration {}", str);
189             return Mono.empty();
190         }
191     }
192
193     private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
194         return this.appConfig.setConfiguration(config);
195     }
196
197     boolean fileExists(String filepath) {
198         return (filepath != null && (new File(filepath).exists()));
199     }
200
201     private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
202         synchronized (this.rics) {
203             String ricName = updatedInfo.getRicConfig().name();
204             RicConfigUpdate.Type event = updatedInfo.getType();
205             if (event == RicConfigUpdate.Type.ADDED) {
206                 addRic(updatedInfo.getRicConfig());
207             } else if (event == RicConfigUpdate.Type.REMOVED) {
208                 rics.remove(ricName);
209                 this.policies.removePoliciesForRic(ricName);
210             } else if (event == RicConfigUpdate.Type.CHANGED) {
211                 Ric ric = this.rics.get(ricName);
212                 if (ric == null) {
213                     // Should not happen,just for robustness
214                     addRic(updatedInfo.getRicConfig());
215                 } else {
216                     ric.setRicConfig(updatedInfo.getRicConfig());
217                 }
218             }
219         }
220     }
221
222     private void addRic(RicConfig config) {
223         Ric ric = new Ric(config);
224         this.rics.put(ric);
225         runRicSynchronization(ric);
226     }
227
228     void runRicSynchronization(Ric ric) {
229         RicSynchronizationTask synchronizationTask =
230             new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
231         synchronizationTask.run(ric);
232     }
233
234     /**
235      * Reads the configuration from file.
236      */
237     Flux<JsonObject> loadConfigurationFromFile() {
238         String filepath = appConfig.getLocalConfigurationFilePath();
239         if (!fileExists(filepath)) {
240             return Flux.empty();
241         }
242
243         GsonBuilder gsonBuilder = new GsonBuilder();
244         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
245
246         try (InputStream inputStream = createInputStream(filepath)) {
247             JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
248             ApplicationConfigParser appParser = new ApplicationConfigParser();
249             appParser.parse(rootObject);
250             logger.debug("Local configuration file loaded: {}", filepath);
251             return Flux.just(rootObject);
252         } catch (IOException | ServiceException e) {
253             logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
254             return Flux.empty();
255         }
256     }
257
258     JsonElement getJsonElement(InputStream inputStream) {
259         return JsonParser.parseReader(new InputStreamReader(inputStream));
260     }
261
262     InputStream createInputStream(@NotNull String filepath) throws IOException {
263         return new BufferedInputStream(new FileInputStream(filepath));
264     }
265 }