import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
import java.io.BufferedInputStream;
@Value("#{systemEnvironment}")
public Properties systemEnvironment;
+ /**
+ * The time between refreshes of the configuration.
+ */
+ static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+
final ApplicationConfig appConfig;
@Getter(AccessLevel.PROTECTED)
private Disposable refreshTask = null;
private final Policies policies;
private final Services services;
private final PolicyTypes policyTypes;
- private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
- private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
@Autowired
public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
stop();
refreshTask = createRefreshTask() //
.subscribe(notUsed -> logger.debug("Refreshed configuration data"),
- throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()),
() -> logger.error("Configuration refresh terminated"));
}
}
Flux<RicConfigUpdate.Type> createRefreshTask() {
- Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
- .filter(notUsed -> configFileExists()) //
+ Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
- .onErrorResume(this::ignoreError) //
- .doOnNext(json -> logger.debug("loadFromFile")) //
+ .onErrorResume(this::ignoreErrorFlux) //
+ .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnTerminate(() -> logger.error("loadFromFile Terminate"));
- Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
+ Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
+ .flatMap(i -> getEnvironment(systemEnvironment)) //
.flatMap(this::createCbsClient) //
- .flatMapMany(this::periodicConfigurationUpdates) //
- .onErrorResume(this::ignoreError) //
- .doOnNext(json -> logger.debug("loadFromConsul")) //
+ .flatMap(this::getFromCbs) //
+ .onErrorResume(this::ignoreErrorMono) //
+ .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
.doOnNext(json -> this.isConsulUsed = true) //
- .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
+ .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
}
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
+ .onErrorResume(t -> Mono.empty());
}
Mono<CbsClient> createCbsClient(EnvProperties env) {
- return CbsClientFactory.createCbsClient(env);
+ return CbsClientFactory.createCbsClient(env) //
+ .onErrorResume(this::ignoreErrorMono);
}
- private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
- final Duration initialDelay = Duration.ZERO;
+ private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
- return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
+ try {
+ return cbsClient.get(getConfigRequest) //
+ .onErrorResume(this::ignoreErrorMono);
+ } catch (Exception e) {
+ return ignoreErrorMono(e);
+ }
}
- private <R> Mono<R> ignoreError(Throwable throwable) {
+ private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
+ String errMsg = throwable.toString();
+ logger.warn("Could not refresh application configuration. {}", errMsg);
+ return Flux.empty();
+ }
+
+ private <R> Mono<R> ignoreErrorMono(Throwable throwable) {
String errMsg = throwable.toString();
logger.warn("Could not refresh application configuration. {}", errMsg);
return Mono.empty();
try {
ApplicationConfigParser parser = new ApplicationConfigParser();
return Mono.just(parser.parse(jsonObject));
- } catch (ServiceException e) {
- logger.error("Could not parse configuration {}", e.toString(), e);
- return Mono.error(e);
+ } catch (Exception e) {
+ String str = e.toString();
+ logger.error("Could not parse configuration {}", str);
+ return Mono.empty();
}
}
private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
- return this.appConfig.setConfiguration(config.ricConfigs(), config.dmaapPublisherConfig(),
- config.dmaapConsumerConfig());
+ return this.appConfig.setConfiguration(config);
}
- boolean configFileExists() {
- String filepath = appConfig.getLocalConfigurationFilePath();
+ boolean fileExists(String filepath) {
return (filepath != null && (new File(filepath).exists()));
}
*/
Flux<JsonObject> loadConfigurationFromFile() {
String filepath = appConfig.getLocalConfigurationFilePath();
+ if (!fileExists(filepath)) {
+ return Flux.empty();
+ }
+
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
appParser.parse(rootObject);
logger.debug("Local configuration file loaded: {}", filepath);
return Flux.just(rootObject);
- } catch (JsonSyntaxException | ServiceException | IOException e) {
- logger.debug("Local configuration file not loaded: {}", filepath, e);
+ } catch (IOException | ServiceException e) {
+ logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
return Flux.empty();
}
}