import javax.validation.constraints.NotNull;
+import lombok.AccessLevel;
+import lombok.Getter;
+
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import reactor.core.publisher.Mono;
/**
- * Regularly refreshes the configuration from Consul.
+ * Regularly refreshes the configuration from Consul or from a local
+ * configuration file.
*/
@Component
public class RefreshConfigTask {
public Properties systemEnvironment;
final ApplicationConfig appConfig;
+ @Getter(AccessLevel.PROTECTED)
private Disposable refreshTask = null;
private boolean isConsulUsed = false;
logger.debug("Starting refreshConfigTask");
stop();
refreshTask = createRefreshTask() //
- .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
- throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ .subscribe(
+ notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger
+ .error("Configuration refresh terminated due to exception {}", throwable.getMessage()),
() -> logger.error("Configuration refresh terminated"));
}
public void stop() {
if (refreshTask != null) {
refreshTask.dispose();
- refreshTask = null;
}
}
- Flux<ApplicationConfig> createRefreshTask() {
+ Flux<RicConfigUpdate.Type> createRefreshTask() {
Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
.filter(notUsed -> configFileExists()) //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig) //
.doOnNext(this::handleUpdatedRicConfig) //
- .flatMap(configUpdate -> Flux.just(this.appConfig)) //
+ .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
.doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
return Mono.empty();
}
- private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
+ private Mono<ApplicationConfigParser.ConfigParserResult> parseConfiguration(JsonObject jsonObject) {
try {
ApplicationConfigParser parser = new ApplicationConfigParser();
- parser.parse(jsonObject);
- return Mono.just(parser);
+ return Mono.just(parser.parse(jsonObject));
} catch (ServiceException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
return Mono.error(e);
}
}
- private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
- return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
- config.getDmaapConsumerConfig());
+ private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
+ return this.appConfig.setConfiguration(config);
}
boolean configFileExists() {
} else if (event == RicConfigUpdate.Type.CHANGED) {
Ric ric = this.rics.get(ricName);
if (ric == null) {
- // Should not happend,just for robustness
+ // Should not happen,just for robustness
addRic(updatedInfo.getRicConfig());
} else {
ric.setRicConfig(updatedInfo.getRicConfig());