Merge "Documentation reorganisation"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RefreshConfigTask.java
index 81735d7..176dd6c 100644 (file)
@@ -39,6 +39,9 @@ import java.util.ServiceLoader;
 
 import javax.validation.constraints.NotNull;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
@@ -67,7 +70,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Regularly refreshes the configuration from Consul.
+ * Regularly refreshes the configuration from Consul or from a local
+ * configuration file.
  */
 @Component
 public class RefreshConfigTask {
@@ -78,6 +82,7 @@ public class RefreshConfigTask {
     public Properties systemEnvironment;
 
     final ApplicationConfig appConfig;
+    @Getter(AccessLevel.PROTECTED)
     private Disposable refreshTask = null;
     private boolean isConsulUsed = false;
 
@@ -112,11 +117,10 @@ public class RefreshConfigTask {
     public void stop() {
         if (refreshTask != null) {
             refreshTask.dispose();
-            refreshTask = null;
         }
     }
 
-    Flux<ApplicationConfig> createRefreshTask() {
+    Flux<RicConfigUpdate.Type> createRefreshTask() {
         Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
             .filter(notUsed -> configFileExists()) //
             .filter(notUsed -> !this.isConsulUsed) //
@@ -137,7 +141,7 @@ public class RefreshConfigTask {
             .flatMap(this::parseConfiguration) //
             .flatMap(this::updateConfig) //
             .doOnNext(this::handleUpdatedRicConfig) //
-            .flatMap(configUpdate -> Flux.just(this.appConfig)) //
+            .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
             .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
     }
 
@@ -161,20 +165,19 @@ public class RefreshConfigTask {
         return Mono.empty();
     }
 
-    private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
+    private Mono<ApplicationConfigParser.ConfigParserResult> parseConfiguration(JsonObject jsonObject) {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
-            parser.parse(jsonObject);
-            return Mono.just(parser);
+            return Mono.just(parser.parse(jsonObject));
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
             return Mono.error(e);
         }
     }
 
-    private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
-        return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
-            config.getDmaapConsumerConfig());
+    private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
+        return this.appConfig.setConfiguration(config.ricConfigs(), config.dmaapPublisherConfig(),
+            config.dmaapConsumerConfig());
     }
 
     boolean configFileExists() {
@@ -194,7 +197,7 @@ public class RefreshConfigTask {
             } else if (event == RicConfigUpdate.Type.CHANGED) {
                 Ric ric = this.rics.get(ricName);
                 if (ric == null) {
-                    // Should not happend,just for robustness
+                    // Should not happen,just for robustness
                     addRic(updatedInfo.getRicConfig());
                 } else {
                     ric.setRicConfig(updatedInfo.getRicConfig());