Simplified startup
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RefreshConfigTask.java
index 4080b37..81735d7 100644 (file)
@@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException;
 import com.google.gson.TypeAdapterFactory;
 
 import java.io.BufferedInputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -44,9 +45,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
 import org.oransc.policyagent.configuration.ApplicationConfigParser;
+import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -68,22 +77,36 @@ public class RefreshConfigTask {
     @Value("#{systemEnvironment}")
     public Properties systemEnvironment;
 
-    private final ApplicationConfig appConfig;
+    final ApplicationConfig appConfig;
     private Disposable refreshTask = null;
+    private boolean isConsulUsed = false;
+
+    private final Rics rics;
+    private final A1ClientFactory a1ClientFactory;
+    private final Policies policies;
+    private final Services services;
+    private final PolicyTypes policyTypes;
+    private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+    private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     @Autowired
-    public RefreshConfigTask(ApplicationConfig appConfig) {
+    public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
+        PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) {
         this.appConfig = appConfig;
+        this.rics = rics;
+        this.policies = policies;
+        this.services = services;
+        this.policyTypes = policyTypes;
+        this.a1ClientFactory = a1ClientFactory;
     }
 
     public void start() {
         logger.debug("Starting refreshConfigTask");
         stop();
-        loadConfigurationFromFile();
         refreshTask = createRefreshTask() //
             .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
-                () -> logger.debug("Configuration refresh completed"));
+                () -> logger.error("Configuration refresh terminated"));
     }
 
     public void stop() {
@@ -94,11 +117,28 @@ public class RefreshConfigTask {
     }
 
     Flux<ApplicationConfig> createRefreshTask() {
-        return getEnvironment(systemEnvironment) //
+        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
+            .filter(notUsed -> configFileExists()) //
+            .filter(notUsed -> !this.isConsulUsed) //
+            .flatMap(notUsed -> loadConfigurationFromFile()) //
+            .onErrorResume(this::ignoreError) //
+            .doOnNext(json -> logger.debug("loadFromFile")) //
+            .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
+
+        Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
             .flatMap(this::createCbsClient) //
             .flatMapMany(this::periodicConfigurationUpdates) //
-            .map(this::parseRicConfigurationfromConsul) //
-            .onErrorResume(this::onErrorResume);
+            .onErrorResume(this::ignoreError) //
+            .doOnNext(json -> logger.debug("loadFromConsul")) //
+            .doOnNext(json -> this.isConsulUsed = true) //
+            .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
+
+        return Flux.merge(loadFromFile, loadFromConsul) //
+            .flatMap(this::parseConfiguration) //
+            .flatMap(this::updateConfig) //
+            .doOnNext(this::handleUpdatedRicConfig) //
+            .flatMap(configUpdate -> Flux.just(this.appConfig)) //
+            .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
     }
 
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
@@ -111,38 +151,75 @@ public class RefreshConfigTask {
 
     private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
         final Duration initialDelay = Duration.ZERO;
-        final Duration refreshPeriod = Duration.ofMinutes(1);
         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
-        return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+        return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
     }
 
-    private <R> Mono<R> onErrorResume(Throwable throwable) {
+    private <R> Mono<R> ignoreError(Throwable throwable) {
         String errMsg = throwable.toString();
-        logger.error("Could not refresh application configuration. {}", errMsg);
+        logger.warn("Could not refresh application configuration. {}", errMsg);
         return Mono.empty();
     }
 
-    private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+    private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             parser.parse(jsonObject);
-            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
-                parser.getDmaapConsumerConfig());
+            return Mono.just(parser);
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
+            return Mono.error(e);
+        }
+    }
+
+    private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
+        return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
+            config.getDmaapConsumerConfig());
+    }
+
+    boolean configFileExists() {
+        String filepath = appConfig.getLocalConfigurationFilePath();
+        return (filepath != null && (new File(filepath).exists()));
+    }
+
+    private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+        synchronized (this.rics) {
+            String ricName = updatedInfo.getRicConfig().name();
+            RicConfigUpdate.Type event = updatedInfo.getType();
+            if (event == RicConfigUpdate.Type.ADDED) {
+                addRic(updatedInfo.getRicConfig());
+            } else if (event == RicConfigUpdate.Type.REMOVED) {
+                rics.remove(ricName);
+                this.policies.removePoliciesForRic(ricName);
+            } else if (event == RicConfigUpdate.Type.CHANGED) {
+                Ric ric = this.rics.get(ricName);
+                if (ric == null) {
+                    // Should not happend,just for robustness
+                    addRic(updatedInfo.getRicConfig());
+                } else {
+                    ric.setRicConfig(updatedInfo.getRicConfig());
+                }
+            }
         }
-        return this.appConfig;
+    }
+
+    private void addRic(RicConfig config) {
+        Ric ric = new Ric(config);
+        this.rics.put(ric);
+        runRicSynchronization(ric);
+    }
+
+    void runRicSynchronization(Ric ric) {
+        RicSynchronizationTask synchronizationTask =
+            new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+        synchronizationTask.run(ric);
     }
 
     /**
      * Reads the configuration from file.
      */
-    void loadConfigurationFromFile() {
+    Flux<JsonObject> loadConfigurationFromFile() {
         String filepath = appConfig.getLocalConfigurationFilePath();
-        if (filepath == null) {
-            logger.debug("No localconfiguration file used");
-            return;
-        }
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
 
@@ -150,11 +227,11 @@ public class RefreshConfigTask {
             JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
-                appParser.getDmaapConsumerConfig());
-            logger.info("Local configuration file loaded: {}", filepath);
+            logger.debug("Local configuration file loaded: {}", filepath);
+            return Flux.just(rootObject);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
-            logger.trace("Local configuration file not loaded: {}", filepath, e);
+            logger.debug("Local configuration file not loaded: {}", filepath, e);
+            return Flux.empty();
         }
     }