Merge "Added STD sim 2.0.0 tests"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RefreshConfigTask.java
index 81735d7..b99a230 100644 (file)
@@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
 import com.google.gson.TypeAdapterFactory;
 
 import java.io.BufferedInputStream;
@@ -39,6 +38,9 @@ import java.util.ServiceLoader;
 
 import javax.validation.constraints.NotNull;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
@@ -50,7 +52,6 @@ import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
 import org.oransc.policyagent.configuration.ApplicationConfigParser;
 import org.oransc.policyagent.configuration.RicConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
@@ -67,7 +68,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Regularly refreshes the configuration from Consul.
+ * Regularly refreshes the configuration from Consul or from a local configuration file.
  */
 @Component
 public class RefreshConfigTask {
@@ -77,7 +78,13 @@ public class RefreshConfigTask {
     @Value("#{systemEnvironment}")
     public Properties systemEnvironment;
 
+    /**
+     * The time between refreshes of the configuration.
+     */
+    static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+
     final ApplicationConfig appConfig;
+    @Getter(AccessLevel.PROTECTED)
     private Disposable refreshTask = null;
     private boolean isConsulUsed = false;
 
@@ -86,8 +93,6 @@ public class RefreshConfigTask {
     private final Policies policies;
     private final Services services;
     private final PolicyTypes policyTypes;
-    private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
-    private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     @Autowired
     public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
@@ -105,80 +110,89 @@ public class RefreshConfigTask {
         stop();
         refreshTask = createRefreshTask() //
             .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
-                throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+                throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()),
                 () -> logger.error("Configuration refresh terminated"));
     }
 
     public void stop() {
         if (refreshTask != null) {
             refreshTask.dispose();
-            refreshTask = null;
         }
     }
 
-    Flux<ApplicationConfig> createRefreshTask() {
-        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
-            .filter(notUsed -> configFileExists()) //
+    Flux<RicConfigUpdate.Type> createRefreshTask() {
+        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
             .filter(notUsed -> !this.isConsulUsed) //
             .flatMap(notUsed -> loadConfigurationFromFile()) //
-            .onErrorResume(this::ignoreError) //
-            .doOnNext(json -> logger.debug("loadFromFile")) //
+            .onErrorResume(this::ignoreErrorFlux) //
+            .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
             .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
 
-        Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
+        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
+            .flatMap(i -> getEnvironment(systemEnvironment)) //
             .flatMap(this::createCbsClient) //
-            .flatMapMany(this::periodicConfigurationUpdates) //
-            .onErrorResume(this::ignoreError) //
-            .doOnNext(json -> logger.debug("loadFromConsul")) //
+            .flatMap(this::getFromCbs) //
+            .onErrorResume(this::ignoreErrorMono) //
+            .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
             .doOnNext(json -> this.isConsulUsed = true) //
-            .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
+            .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
 
         return Flux.merge(loadFromFile, loadFromConsul) //
             .flatMap(this::parseConfiguration) //
             .flatMap(this::updateConfig) //
             .doOnNext(this::handleUpdatedRicConfig) //
-            .flatMap(configUpdate -> Flux.just(this.appConfig)) //
+            .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
             .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
     }
 
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
-        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
+            .onErrorResume(t -> Mono.empty());
     }
 
     Mono<CbsClient> createCbsClient(EnvProperties env) {
-        return CbsClientFactory.createCbsClient(env);
+        return CbsClientFactory.createCbsClient(env) //
+            .onErrorResume(this::ignoreErrorMono);
+    }
+
+    private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
+        try {
+            final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+            return cbsClient.get(getConfigRequest) //
+                .onErrorResume(this::ignoreErrorMono);
+        } catch (Exception e) {
+            return ignoreErrorMono(e);
+        }
     }
 
-    private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
-        final Duration initialDelay = Duration.ZERO;
-        final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
-        return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
+    private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
+        String errMsg = throwable.toString();
+        logger.warn("Could not refresh application configuration. {}", errMsg);
+        return Flux.empty();
     }
 
-    private <R> Mono<R> ignoreError(Throwable throwable) {
+    private <R> Mono<R> ignoreErrorMono(Throwable throwable) {
         String errMsg = throwable.toString();
         logger.warn("Could not refresh application configuration. {}", errMsg);
         return Mono.empty();
     }
 
-    private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
+    private Mono<ApplicationConfigParser.ConfigParserResult> parseConfiguration(JsonObject jsonObject) {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
-            parser.parse(jsonObject);
-            return Mono.just(parser);
-        } catch (ServiceException e) {
-            logger.error("Could not parse configuration {}", e.toString(), e);
-            return Mono.error(e);
+            return Mono.just(parser.parse(jsonObject));
+        } catch (Exception e) {
+            String str = e.toString();
+            logger.error("Could not parse configuration {}", str);
+            return Mono.empty();
         }
     }
 
-    private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
-        return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
-            config.getDmaapConsumerConfig());
+    private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
+        return this.appConfig.setConfiguration(config);
     }
 
-    boolean configFileExists() {
-        String filepath = appConfig.getLocalConfigurationFilePath();
+    boolean fileExists(String filepath) {
         return (filepath != null && (new File(filepath).exists()));
     }
 
@@ -194,7 +208,7 @@ public class RefreshConfigTask {
             } else if (event == RicConfigUpdate.Type.CHANGED) {
                 Ric ric = this.rics.get(ricName);
                 if (ric == null) {
-                    // Should not happend,just for robustness
+                    // Should not happen,just for robustness
                     addRic(updatedInfo.getRicConfig());
                 } else {
                     ric.setRicConfig(updatedInfo.getRicConfig());
@@ -220,6 +234,10 @@ public class RefreshConfigTask {
      */
     Flux<JsonObject> loadConfigurationFromFile() {
         String filepath = appConfig.getLocalConfigurationFilePath();
+        if (!fileExists(filepath)) {
+            return Flux.empty();
+        }
+
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
 
@@ -229,8 +247,8 @@ public class RefreshConfigTask {
             appParser.parse(rootObject);
             logger.debug("Local configuration file loaded: {}", filepath);
             return Flux.just(rootObject);
-        } catch (JsonSyntaxException | ServiceException | IOException e) {
-            logger.debug("Local configuration file not loaded: {}", filepath, e);
+        } catch (Exception e) {
+            logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
             return Flux.empty();
         }
     }