Changed the config loading from consul
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RefreshConfigTask.java
index 41f2064..dd235db 100644 (file)
@@ -124,46 +124,49 @@ public class RefreshConfigTask {
             .filter(notUsed -> configFileExists()) //
             .filter(notUsed -> !this.isConsulUsed) //
             .flatMap(notUsed -> loadConfigurationFromFile()) //
-            .onErrorResume(this::ignoreError) //
+            .onErrorResume(this::ignoreErrorFlux) //
             .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
-            .doOnTerminate(() -> logger.info("loadFromFile Terminate"));
+            .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
 
-        Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
+        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) //
+            .flatMap(i -> getEnvironment(systemEnvironment)) //
             .flatMap(this::createCbsClient) //
-            .flatMapMany(this::periodicConfigurationUpdates) //
-            .onErrorResume(this::ignoreError) //
+            .flatMap(this::getFromCbs) //
             .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
             .doOnNext(json -> this.isConsulUsed = true) //
-            .doOnTerminate(() -> logger.info("loadFromConsul Terminated"));
+            .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
 
         return Flux.merge(loadFromFile, loadFromConsul) //
             .flatMap(this::parseConfiguration) //
             .flatMap(this::updateConfig) //
             .doOnNext(this::handleUpdatedRicConfig) //
             .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
-            .doOnTerminate(() -> handleTerminate("Configuration refresh task is terminated"));
-    }
-
-    private void handleTerminate(String info) {
-        logger.error(info);
+            .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
     }
 
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
-        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
+            .onErrorResume(t -> Mono.empty());
     }
 
     Mono<CbsClient> createCbsClient(EnvProperties env) {
-        return CbsClientFactory.createCbsClient(env);
+        return CbsClientFactory.createCbsClient(env) //
+            .onErrorResume(this::ignoreErrorMono);
     }
 
-    private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
-        final Duration initialDelay = Duration.ZERO;
+    private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
-        return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL) //
-            .onErrorResume(this::ignoreError);
+        return cbsClient.get(getConfigRequest) //
+            .onErrorResume(this::ignoreErrorMono);
+    }
+
+    private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
+        String errMsg = throwable.toString();
+        logger.warn("Could not refresh application configuration. {}", errMsg);
+        return Flux.empty();
     }
 
-    private <R> Mono<R> ignoreError(Throwable throwable) {
+    private <R> Mono<R> ignoreErrorMono(Throwable throwable) {
         String errMsg = throwable.toString();
         logger.warn("Could not refresh application configuration. {}", errMsg);
         return Mono.empty();