import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
+import java.util.Vector;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.oransc.policyagent.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-@Component
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class ApplicationConfig {
@Value("#{systemEnvironment}")
Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
+ private Collection<Observer> observers = new Vector<>();
+
+ private Map<String, RicConfig> ricConfigs = new HashMap<>();
+
@NotEmpty
private String filepath;
public ApplicationConfig() {
}
+ protected String getLocalConfigurationFilePath() {
+ return this.filepath;
+ }
+
public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
- /**
- * Reads the cloud configuration.
- */
+ public synchronized Collection<RicConfig> getRicConfigs() {
+ return this.ricConfigs.values();
+ }
+
+ public synchronized Optional<RicConfig> lookupRicConfigForManagedElement(String managedElementId) {
+ for (RicConfig ricConfig : getRicConfigs()) {
+ if (ricConfig.managedElementIds().contains(managedElementId)) {
+ return Optional.of(ricConfig);
+ }
+ }
+ return Optional.empty();
+ }
+
+ public RicConfig getRic(String ricName) throws ServiceException {
+ for (RicConfig ricConfig : getRicConfigs()) {
+ if (ricConfig.name().equals(ricName)) {
+ return ricConfig;
+ }
+ }
+ throw new ServiceException("Could not find ric: " + ricName);
+ }
+
public void initialize() {
+ stop();
loadConfigurationFromFile();
+
+ refreshConfigTask = createRefreshTask() //
+ .subscribe(notUsed -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public static enum RicConfigUpdate {
+ ADDED, CHANGED, REMOVED
+ }
+
+ public interface Observer {
+ void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
+ }
+
+ public void addObserver(Observer o) {
+ this.observers.add(o);
+ }
+
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+ }
+
+ Flux<ApplicationConfig> createRefreshTask() {
+ return getEnvironment(systemEnvironment) //
+ .flatMap(this::createCbsClient) //
+ .flatMapMany(this::periodicConfigurationUpdates) //
+ .map(this::parseRicConfigurationfromConsul) //
+ .onErrorResume(this::onErrorResume);
+ }
+
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
}
- Mono<Environment.Variables> getEnvironment(Properties systemEnvironment) {
- return Environment.readEnvironmentVariables(systemEnvironment);
+ private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+ try {
+ ApplicationConfigParser parser = new ApplicationConfigParser();
+ parser.parse(jsonObject);
+ setConfiguration(parser.getRicConfigs());
+
+ } catch (ServiceException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
+ private class Notification {
+ final RicConfig ric;
+ final RicConfigUpdate event;
+
+ Notification(RicConfig ric, RicConfigUpdate event) {
+ this.ric = ric;
+ this.event = event;
+ }
+ }
+
+ private void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
+ Collection<Notification> notifications = new Vector<>();
+ synchronized (this) {
+ Map<String, RicConfig> newRicConfigs = new HashMap<>();
+ for (RicConfig newConfig : ricConfigs) {
+ RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+ if (oldConfig == null) {
+ newRicConfigs.put(newConfig.name(), newConfig);
+ notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
+ this.ricConfigs.remove(newConfig.name());
+ } else if (!newConfig.equals(newConfig)) {
+ notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
+ newRicConfigs.put(newConfig.name(), newConfig);
+ this.ricConfigs.remove(newConfig.name());
+ } else {
+ newRicConfigs.put(oldConfig.name(), oldConfig);
+ }
+ }
+ for (RicConfig deletedConfig : this.ricConfigs.values()) {
+ notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
+ }
+ this.ricConfigs = newRicConfigs;
+ }
+ notifyObservers(notifications);
+ }
+
+ private void notifyObservers(Collection<Notification> notifications) {
+ for (Observer observer : this.observers) {
+ for (Notification notif : notifications) {
+ observer.onRicConfigUpdate(notif.ric, notif.event);
+ }
+ }
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
}
/**
* Reads the configuration from file.
*/
- void loadConfigurationFromFile() {
+ public void loadConfigurationFromFile() {
+ String filepath = getLocalConfigurationFilePath();
+ if (filepath == null) {
+ logger.debug("No localconfiguration file used");
+ return;
+ }
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
}
ApplicationConfigParser appParser = new ApplicationConfigParser();
appParser.parse(rootObject);
+ setConfiguration(appParser.getRicConfigs());
logger.info("Local configuration file loaded: {}", filepath);
} catch (JsonSyntaxException | ServiceException | IOException e) {
logger.trace("Local configuration file not loaded: {}", filepath, e);
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
-
}