Simplified startup 94/2694/4
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 6 Mar 2020 07:33:57 +0000 (08:33 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 6 Mar 2020 09:20:14 +0000 (10:20 +0100)
Simplified configuration handling.
Fixed sonar issues.

Change-Id: I34873e3a8e8a276df9316cae3750f9090d3ba3cb
Issue-ID: NONRTRIC-149
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
12 files changed:
policy-agent/src/main/java/org/oransc/policyagent/Application.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapAdapterInput.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscAdapterInput.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java [deleted file]
policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java [deleted file]

index 18120e5..1c397fe 100644 (file)
@@ -20,7 +20,7 @@
 
 package org.oransc.policyagent;
 
-import org.oransc.policyagent.tasks.StartupService;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
@@ -32,7 +32,7 @@ import org.springframework.context.annotation.Bean;
 public class Application {
 
     @Autowired
-    private StartupService startupService;
+    private RefreshConfigTask configRefresh;
 
     public static void main(String[] args) {
         SpringApplication.run(Application.class);
@@ -47,6 +47,6 @@ public class Application {
      */
     @Bean
     public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
-        return args -> startupService.startup();
+        return args -> configRefresh.start();
     }
 }
index f43fb80..eb85de9 100644 (file)
@@ -28,7 +28,7 @@ import org.immutables.value.Value;
 
 @Value.Immutable
 @Gson.TypeAdapters
-public interface SdncOnapAdapterInput {
+interface SdncOnapAdapterInput {
     public String nearRtRicId();
 
     public Optional<String> policyTypeId();
index 9a67f44..5fbfd3a 100644 (file)
@@ -27,7 +27,7 @@ import org.immutables.value.Value;
 
 @Value.Immutable
 @Gson.TypeAdapters
-public interface SdncOscAdapterInput {
+interface SdncOscAdapterInput {
     public String nearRtRicUrl();
 
     public Optional<String> policyTypeId();
index deacb44..3b4f810 100644 (file)
@@ -34,6 +34,7 @@ import lombok.Getter;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import reactor.core.publisher.Flux;
 
 @EnableConfigurationProperties
 @ConfigurationProperties("app")
@@ -50,7 +51,6 @@ public class ApplicationConfig {
     @NotEmpty
     private String a1ControllerPassword;
 
-    private Collection<Observer> observers = new ArrayList<>();
     private Map<String, RicConfig> ricConfigs = new HashMap<>();
     @Getter
     private Properties dmaapPublisherConfig;
@@ -61,15 +61,15 @@ public class ApplicationConfig {
         return this.filepath;
     }
 
-    public String getA1ControllerBaseUrl() {
+    public synchronized String getA1ControllerBaseUrl() {
         return this.a1ControllerBaseUrl;
     }
 
-    public String getA1ControllerUsername() {
+    public synchronized String getA1ControllerUsername() {
         return this.a1ControllerUsername;
     }
 
-    public String getA1ControllerPassword() {
+    public synchronized String getA1ControllerPassword() {
         return this.a1ControllerPassword;
     }
 
@@ -105,65 +105,49 @@ public class ApplicationConfig {
         throw new ServiceException("Could not find ric: " + ricName);
     }
 
-    public enum RicConfigUpdate {
-        ADDED, CHANGED, REMOVED
-    }
-
-    public interface Observer {
-        void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
-    }
-
-    public void addObserver(Observer o) {
-        this.observers.add(o);
-    }
+    public static class RicConfigUpdate {
+        public enum Type {
+            ADDED, CHANGED, REMOVED
+        }
 
-    private class Notification {
-        final RicConfig ric;
-        final RicConfigUpdate event;
+        @Getter
+        private final RicConfig ricConfig;
+        @Getter
+        private final Type type;
 
-        Notification(RicConfig ric, RicConfigUpdate event) {
-            this.ric = ric;
-            this.event = event;
+        RicConfigUpdate(RicConfig ric, Type event) {
+            this.ricConfig = ric;
+            this.type = event;
         }
     }
 
-    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapPublisherConfig,
-        Properties dmaapConsumerConfig) {
-
-        Collection<Notification> notifications = new ArrayList<>();
-        synchronized (this) {
-            this.dmaapPublisherConfig = dmaapPublisherConfig;
-            this.dmaapConsumerConfig = dmaapConsumerConfig;
-
-            Map<String, RicConfig> newRicConfigs = new HashMap<>();
-            for (RicConfig newConfig : ricConfigs) {
-                RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
-                if (oldConfig == null) {
-                    newRicConfigs.put(newConfig.name(), newConfig);
-                    notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
-                    this.ricConfigs.remove(newConfig.name());
-                } else if (!newConfig.equals(oldConfig)) {
-                    notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
-                    newRicConfigs.put(newConfig.name(), newConfig);
-                    this.ricConfigs.remove(newConfig.name());
-                } else {
-                    newRicConfigs.put(oldConfig.name(), oldConfig);
-                }
+    public synchronized Flux<RicConfigUpdate> setConfiguration(@NotNull Collection<RicConfig> ricConfigs,
+        Properties dmaapPublisherConfig, Properties dmaapConsumerConfig) {
+
+        Collection<RicConfigUpdate> modifications = new ArrayList<>();
+        this.dmaapPublisherConfig = dmaapPublisherConfig;
+        this.dmaapConsumerConfig = dmaapConsumerConfig;
+
+        Map<String, RicConfig> newRicConfigs = new HashMap<>();
+        for (RicConfig newConfig : ricConfigs) {
+            RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+            if (oldConfig == null) {
+                newRicConfigs.put(newConfig.name(), newConfig);
+                modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.ADDED));
+                this.ricConfigs.remove(newConfig.name());
+            } else if (!newConfig.equals(oldConfig)) {
+                modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.CHANGED));
+                newRicConfigs.put(newConfig.name(), newConfig);
+                this.ricConfigs.remove(newConfig.name());
+            } else {
+                newRicConfigs.put(oldConfig.name(), oldConfig);
             }
-            for (RicConfig deletedConfig : this.ricConfigs.values()) {
-                notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
-            }
-            this.ricConfigs = newRicConfigs;
         }
-
-        notifyObservers(notifications);
-    }
-
-    private void notifyObservers(Collection<Notification> notifications) {
-        for (Observer observer : this.observers) {
-            for (Notification notif : notifications) {
-                observer.onRicConfigUpdate(notif.ric, notif.event);
-            }
+        for (RicConfig deletedConfig : this.ricConfigs.values()) {
+            modifications.add(new RicConfigUpdate(deletedConfig, RicConfigUpdate.Type.REMOVED));
         }
+        this.ricConfigs = newRicConfigs;
+
+        return Flux.fromIterable(modifications);
     }
 }
index b23595a..cbe3800 100644 (file)
@@ -22,8 +22,10 @@ package org.oransc.policyagent.dmaap;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.oransc.policyagent.clients.AsyncRestClient;
@@ -94,8 +96,9 @@ public class DmaapMessageHandler {
     }
 
     private String payload(DmaapRequestMessage message) {
-        if (message.payload().isPresent()) {
-            return gson.toJson(message.payload().get());
+        Optional<JsonObject> payload = message.payload();
+        if (payload.isPresent()) {
+            return gson.toJson(payload.get());
         } else {
             logger.warn("Expected payload in message from DMAAP: {}", message);
             return "";
index bc5d77a..ff3f331 100644 (file)
@@ -42,6 +42,7 @@ public class Lock {
     private boolean isExclusive = false;
     private int lockCounter = 0;
     private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+    private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
 
     private static class AsynchCallbackExecutor implements Runnable {
         private List<LockRequest> lockRequestQueue = new LinkedList<>();
@@ -72,6 +73,7 @@ public class Lock {
             return q;
         }
 
+        @SuppressWarnings("java:S2274")
         private synchronized void waitForNewEntries() {
             try {
                 if (this.lockRequestQueue.isEmpty()) {
@@ -79,13 +81,12 @@ public class Lock {
                 }
             } catch (InterruptedException e) {
                 logger.warn("waitForUnlock interrupted", e);
+                Thread.currentThread().interrupt();
             }
         }
     }
 
-    private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
-
-    public static enum LockType {
+    public enum LockType {
         EXCLUSIVE, SHARED
     }
 
@@ -116,7 +117,7 @@ public class Lock {
         synchronized (this) {
             if (lockCounter <= 0) {
                 lockCounter = -1; // Might as well stop, to make it easier to find the problem
-                throw new RuntimeException("Number of unlocks must match the number of locks");
+                throw new NullPointerException("Number of unlocks must match the number of locks");
             }
             this.lockCounter--;
             if (lockCounter == 0) {
@@ -148,10 +149,6 @@ public class Lock {
                 }
             }
         }
-
-        /*
-         * for (LockRequest request : granted) { request.callback.success(this); }
-         */
         callbackProcessor.addAll(granted);
     }
 
@@ -171,11 +168,13 @@ public class Lock {
         lockRequestQueue.add(new LockRequest(callback, lockType, this));
     }
 
-    private void waitForUnlock() {
+    @SuppressWarnings("java:S2274")
+    private synchronized void waitForUnlock() {
         try {
             this.wait();
         } catch (InterruptedException e) {
             logger.warn("waitForUnlock interrupted", e);
+            Thread.currentThread().interrupt();
         }
     }
 
index a5de890..0242b90 100644 (file)
@@ -20,8 +20,6 @@
 
 package org.oransc.policyagent.repository;
 
-import com.google.common.collect.ImmutableList;
-
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -38,9 +36,8 @@ import org.oransc.policyagent.configuration.RicConfig;
  */
 public class Ric {
 
-    private final RicConfig ricConfig;
-    private final ImmutableList<String> managedElementIds;
-
+    @Setter
+    private RicConfig ricConfig;
     private RicState state = RicState.UNDEFINED;
     private Map<String, PolicyType> supportedPolicyTypes = new HashMap<>();
     @Getter
@@ -57,7 +54,6 @@ public class Ric {
      */
     public Ric(RicConfig ricConfig) {
         this.ricConfig = ricConfig;
-        this.managedElementIds = ricConfig.managedElementIds();
     }
 
     public String name() {
@@ -82,7 +78,7 @@ public class Ric {
      * @return a vector containing the nodes managed by this Ric.
      */
     public synchronized Collection<String> getManagedElementIds() {
-        return managedElementIds;
+        return ricConfig.managedElementIds();
     }
 
     /**
@@ -92,7 +88,7 @@ public class Ric {
      * @return true if the given node is managed by this Ric.
      */
     public synchronized boolean isManaging(String managedElementId) {
-        return managedElementIds.contains(managedElementId);
+        return ricConfig.managedElementIds().contains(managedElementId);
     }
 
     /**
@@ -138,7 +134,7 @@ public class Ric {
     @Override
     public synchronized String toString() {
         return Ric.class.getSimpleName() + ": " + "name: " + name() + ", state: " + state + ", baseUrl: "
-            + ricConfig.baseUrl() + ", managedNodes: " + managedElementIds;
+            + ricConfig.baseUrl() + ", managedNodes: " + ricConfig.managedElementIds();
     }
 
     /**
index 4080b37..81735d7 100644 (file)
@@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException;
 import com.google.gson.TypeAdapterFactory;
 
 import java.io.BufferedInputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -44,9 +45,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
 import org.oransc.policyagent.configuration.ApplicationConfigParser;
+import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -68,22 +77,36 @@ public class RefreshConfigTask {
     @Value("#{systemEnvironment}")
     public Properties systemEnvironment;
 
-    private final ApplicationConfig appConfig;
+    final ApplicationConfig appConfig;
     private Disposable refreshTask = null;
+    private boolean isConsulUsed = false;
+
+    private final Rics rics;
+    private final A1ClientFactory a1ClientFactory;
+    private final Policies policies;
+    private final Services services;
+    private final PolicyTypes policyTypes;
+    private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+    private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     @Autowired
-    public RefreshConfigTask(ApplicationConfig appConfig) {
+    public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
+        PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) {
         this.appConfig = appConfig;
+        this.rics = rics;
+        this.policies = policies;
+        this.services = services;
+        this.policyTypes = policyTypes;
+        this.a1ClientFactory = a1ClientFactory;
     }
 
     public void start() {
         logger.debug("Starting refreshConfigTask");
         stop();
-        loadConfigurationFromFile();
         refreshTask = createRefreshTask() //
             .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
-                () -> logger.debug("Configuration refresh completed"));
+                () -> logger.error("Configuration refresh terminated"));
     }
 
     public void stop() {
@@ -94,11 +117,28 @@ public class RefreshConfigTask {
     }
 
     Flux<ApplicationConfig> createRefreshTask() {
-        return getEnvironment(systemEnvironment) //
+        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
+            .filter(notUsed -> configFileExists()) //
+            .filter(notUsed -> !this.isConsulUsed) //
+            .flatMap(notUsed -> loadConfigurationFromFile()) //
+            .onErrorResume(this::ignoreError) //
+            .doOnNext(json -> logger.debug("loadFromFile")) //
+            .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
+
+        Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
             .flatMap(this::createCbsClient) //
             .flatMapMany(this::periodicConfigurationUpdates) //
-            .map(this::parseRicConfigurationfromConsul) //
-            .onErrorResume(this::onErrorResume);
+            .onErrorResume(this::ignoreError) //
+            .doOnNext(json -> logger.debug("loadFromConsul")) //
+            .doOnNext(json -> this.isConsulUsed = true) //
+            .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
+
+        return Flux.merge(loadFromFile, loadFromConsul) //
+            .flatMap(this::parseConfiguration) //
+            .flatMap(this::updateConfig) //
+            .doOnNext(this::handleUpdatedRicConfig) //
+            .flatMap(configUpdate -> Flux.just(this.appConfig)) //
+            .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
     }
 
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
@@ -111,38 +151,75 @@ public class RefreshConfigTask {
 
     private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
         final Duration initialDelay = Duration.ZERO;
-        final Duration refreshPeriod = Duration.ofMinutes(1);
         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
-        return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+        return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
     }
 
-    private <R> Mono<R> onErrorResume(Throwable throwable) {
+    private <R> Mono<R> ignoreError(Throwable throwable) {
         String errMsg = throwable.toString();
-        logger.error("Could not refresh application configuration. {}", errMsg);
+        logger.warn("Could not refresh application configuration. {}", errMsg);
         return Mono.empty();
     }
 
-    private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+    private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             parser.parse(jsonObject);
-            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
-                parser.getDmaapConsumerConfig());
+            return Mono.just(parser);
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
+            return Mono.error(e);
+        }
+    }
+
+    private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
+        return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
+            config.getDmaapConsumerConfig());
+    }
+
+    boolean configFileExists() {
+        String filepath = appConfig.getLocalConfigurationFilePath();
+        return (filepath != null && (new File(filepath).exists()));
+    }
+
+    private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+        synchronized (this.rics) {
+            String ricName = updatedInfo.getRicConfig().name();
+            RicConfigUpdate.Type event = updatedInfo.getType();
+            if (event == RicConfigUpdate.Type.ADDED) {
+                addRic(updatedInfo.getRicConfig());
+            } else if (event == RicConfigUpdate.Type.REMOVED) {
+                rics.remove(ricName);
+                this.policies.removePoliciesForRic(ricName);
+            } else if (event == RicConfigUpdate.Type.CHANGED) {
+                Ric ric = this.rics.get(ricName);
+                if (ric == null) {
+                    // Should not happend,just for robustness
+                    addRic(updatedInfo.getRicConfig());
+                } else {
+                    ric.setRicConfig(updatedInfo.getRicConfig());
+                }
+            }
         }
-        return this.appConfig;
+    }
+
+    private void addRic(RicConfig config) {
+        Ric ric = new Ric(config);
+        this.rics.put(ric);
+        runRicSynchronization(ric);
+    }
+
+    void runRicSynchronization(Ric ric) {
+        RicSynchronizationTask synchronizationTask =
+            new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+        synchronizationTask.run(ric);
     }
 
     /**
      * Reads the configuration from file.
      */
-    void loadConfigurationFromFile() {
+    Flux<JsonObject> loadConfigurationFromFile() {
         String filepath = appConfig.getLocalConfigurationFilePath();
-        if (filepath == null) {
-            logger.debug("No localconfiguration file used");
-            return;
-        }
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
 
@@ -150,11 +227,11 @@ public class RefreshConfigTask {
             JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
-                appParser.getDmaapConsumerConfig());
-            logger.info("Local configuration file loaded: {}", filepath);
+            logger.debug("Local configuration file loaded: {}", filepath);
+            return Flux.just(rootObject);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
-            logger.trace("Local configuration file not loaded: {}", filepath, e);
+            logger.debug("Local configuration file not loaded: {}", filepath, e);
+            return Flux.empty();
         }
     }
 
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java
deleted file mode 100644 (file)
index 590b010..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.tasks;
-
-import org.oransc.policyagent.clients.A1ClientFactory;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
-import org.oransc.policyagent.configuration.RicConfig;
-import org.oransc.policyagent.repository.Policies;
-import org.oransc.policyagent.repository.PolicyTypes;
-import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Rics;
-import org.oransc.policyagent.repository.Services;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.Ordered;
-import org.springframework.core.annotation.Order;
-import org.springframework.stereotype.Service;
-
-/**
- * Loads information about RealTime-RICs at startup.
- */
-@Service("startupService")
-@Order(Ordered.HIGHEST_PRECEDENCE)
-public class StartupService implements ApplicationConfig.Observer {
-
-    private static final Logger logger = LoggerFactory.getLogger(StartupService.class);
-
-    @Autowired
-    ApplicationConfig applicationConfig;
-
-    @Autowired
-    RefreshConfigTask refreshConfigTask;
-
-    @Autowired
-    private Rics rics;
-
-    @Autowired
-    PolicyTypes policyTypes;
-
-    @Autowired
-    private A1ClientFactory a1ClientFactory;
-
-    @Autowired
-    private Policies policies;
-
-    @Autowired
-    private Services services;
-
-    // Only for unit testing
-    StartupService(ApplicationConfig appConfig, RefreshConfigTask refreshTask, Rics rics, PolicyTypes policyTypes,
-        A1ClientFactory a1ClientFactory, Policies policies, Services services) {
-        this.applicationConfig = appConfig;
-        this.refreshConfigTask = refreshTask;
-        this.rics = rics;
-        this.policyTypes = policyTypes;
-        this.a1ClientFactory = a1ClientFactory;
-        this.policies = policies;
-        this.services = services;
-    }
-
-    @Override
-    public void onRicConfigUpdate(RicConfig ricConfig, RicConfigUpdate event) {
-        synchronized (this.rics) {
-            switch (event) {
-                case ADDED:
-                case CHANGED:
-                    Ric ric = new Ric(ricConfig);
-                    rics.put(ric);
-                    RicSynchronizationTask synchronizationTask = createSynchronizationTask();
-                    synchronizationTask.run(ric);
-                    break;
-
-                case REMOVED:
-                    rics.remove(ricConfig.name());
-                    policies.removePoliciesForRic(ricConfig.name());
-                    break;
-
-                default:
-                    logger.error("Unhandled ric event: {}", event);
-            }
-        }
-    }
-
-    /**
-     * Reads the configured Rics and performs the service discovery. The result is put into the repository.
-     */
-    public void startup() {
-        logger.debug("Starting up");
-        applicationConfig.addObserver(this);
-        refreshConfigTask.start();
-    }
-
-    RicSynchronizationTask createSynchronizationTask() {
-        return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
-    }
-}
index b03cdc0..17f0997 100644 (file)
@@ -23,25 +23,18 @@ package org.oransc.policyagent.configuration;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.verify;
 
 import java.util.Arrays;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.oransc.policyagent.configuration.ApplicationConfig.Observer;
+import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
 import org.oransc.policyagent.exceptions.ServiceException;
 
 @ExtendWith(MockitoExtension.class)
 public class ApplicationConfigTest {
-    @Mock
-    Observer observerMock1;
-
-    @Mock
-    Observer observerMock2;
 
     private static final ImmutableRicConfig RIC_CONFIG_1 = ImmutableRicConfig.builder() //
         .name("ric1") //
@@ -50,17 +43,25 @@ public class ApplicationConfigTest {
         .build();
 
     @Test
-    public void addRicShouldNotifyAllObserversOfRicAdded() throws Exception {
+    public void gettingNotAddedRicShouldThrowException() {
         ApplicationConfig appConfigUnderTest = new ApplicationConfig();
 
-        appConfigUnderTest.addObserver(observerMock1);
-        appConfigUnderTest.addObserver(observerMock2);
-
         appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null);
 
-        verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED);
-        verify(observerMock2).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED);
+        Exception exception = assertThrows(ServiceException.class, () -> {
+            appConfigUnderTest.getRic("name");
+        });
+
+        assertEquals("Could not find ric: name", exception.getMessage());
+    }
+
+    @Test
+    public void addRicShouldNotifyAllObserversOfRicAdded() throws Exception {
+        ApplicationConfig appConfigUnderTest = new ApplicationConfig();
 
+        RicConfigUpdate update =
+            appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null).blockFirst();
+        assertEquals(RicConfigUpdate.Type.ADDED, update.getType());
         assertTrue(appConfigUnderTest.getRicConfigs().contains(RIC_CONFIG_1), "Ric not added to configuraions.");
 
         assertEquals(RIC_CONFIG_1, appConfigUnderTest.getRic(RIC_CONFIG_1.name()),
@@ -71,8 +72,6 @@ public class ApplicationConfigTest {
     public void changedRicShouldNotifyAllObserversOfRicChanged() throws Exception {
         ApplicationConfig appConfigUnderTest = new ApplicationConfig();
 
-        appConfigUnderTest.addObserver(observerMock1);
-
         appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null);
 
         ImmutableRicConfig changedRicConfig = ImmutableRicConfig.builder() //
@@ -81,11 +80,10 @@ public class ApplicationConfigTest {
             .managedElementIds(new Vector<>()) //
             .build();
 
-        appConfigUnderTest.setConfiguration(Arrays.asList(changedRicConfig), null, null);
-
-        verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED);
-        verify(observerMock1).onRicConfigUpdate(changedRicConfig, ApplicationConfig.RicConfigUpdate.CHANGED);
+        RicConfigUpdate update =
+            appConfigUnderTest.setConfiguration(Arrays.asList(changedRicConfig), null, null).blockFirst();
 
+        assertEquals(RicConfigUpdate.Type.CHANGED, update.getType());
         assertEquals(changedRicConfig, appConfigUnderTest.getRic(RIC_CONFIG_1.name()),
             "Changed Ric not retrieved from configurations.");
     }
@@ -94,8 +92,6 @@ public class ApplicationConfigTest {
     public void removedRicShouldNotifyAllObserversOfRicRemoved() {
         ApplicationConfig appConfigUnderTest = new ApplicationConfig();
 
-        appConfigUnderTest.addObserver(observerMock1);
-
         ImmutableRicConfig ricConfig2 = ImmutableRicConfig.builder() //
             .name("ric2") //
             .baseUrl("ric2_url") //
@@ -104,24 +100,11 @@ public class ApplicationConfigTest {
 
         appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1, ricConfig2), null, null);
 
-        appConfigUnderTest.setConfiguration(Arrays.asList(ricConfig2), null, null);
-
-        verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.REMOVED);
+        RicConfigUpdate update =
+            appConfigUnderTest.setConfiguration(Arrays.asList(ricConfig2), null, null).blockFirst();
 
+        assertEquals(RicConfigUpdate.Type.REMOVED, update.getType());
         assertEquals(1, appConfigUnderTest.getRicConfigs().size(), "Ric not deleted from configurations.");
     }
 
-    @Test
-    public void gettingNotAddedRicShouldThrowException() {
-        ApplicationConfig appConfigUnderTest = new ApplicationConfig();
-
-        appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null);
-
-        Exception exception = assertThrows(ServiceException.class, () -> {
-            appConfigUnderTest.getRic("name");
-        });
-
-        assertEquals("Could not find ric: name", exception.getMessage());
-    }
-
 }
index 7b54e06..6073b26 100644 (file)
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 
@@ -44,6 +43,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.Vector;
@@ -56,10 +56,15 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ApplicationConfigParser;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
 import org.oransc.policyagent.utils.LoggingUtils;
 
 import reactor.core.publisher.Flux;
@@ -93,14 +98,27 @@ public class RefreshConfigTaskTest {
             .build();
     }
 
+    private RefreshConfigTask createTestObject(boolean configFileExists) {
+        RefreshConfigTask obj = spy(new RefreshConfigTask(appConfig, new Rics(), new Policies(), new Services(),
+            new PolicyTypes(), new A1ClientFactory(appConfig)));
+        doReturn(configFileExists).when(obj).configFileExists();
+        return obj;
+    }
+
     @Test
     public void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception {
-        refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+        refreshTaskUnderTest = this.createTestObject(true);
         refreshTaskUnderTest.systemEnvironment = new Properties();
         // When
         doReturn(getCorrectJson()).when(refreshTaskUnderTest).createInputStream(any());
         doReturn("fileName").when(appConfig).getLocalConfigurationFilePath();
-        refreshTaskUnderTest.start();
+
+        StepVerifier.create(refreshTaskUnderTest.createRefreshTask()) //
+            .expectSubscription() //
+            .expectNext(this.appConfig) //
+            .expectNext(this.appConfig) //
+            .thenCancel() //
+            .verify();
 
         // Then
         verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile();
@@ -113,54 +131,46 @@ public class RefreshConfigTaskTest {
 
     @Test
     public void whenFileExistsButJsonIsIncorrect_thenNoRicsArePutInRepository() throws Exception {
-        refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+        refreshTaskUnderTest = this.createTestObject(true);
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
         // When
         doReturn(getIncorrectJson()).when(refreshTaskUnderTest).createInputStream(any());
         doReturn("fileName").when(appConfig).getLocalConfigurationFilePath();
-        refreshTaskUnderTest.loadConfigurationFromFile();
+
+        StepVerifier.create(refreshTaskUnderTest.createRefreshTask()) //
+            .expectSubscription() //
+            .expectNoEvent(Duration.ofMillis(100)) //
+            .thenCancel() //
+            .verify();
 
         // Then
         verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile();
         assertThat(appConfig.getRicConfigs().size()).isEqualTo(0);
     }
 
-    @Test
-    public void whenPeriodicConfigRefreshNoEnvironmentVariables_thenErrorIsLogged() {
-        refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
-        refreshTaskUnderTest.systemEnvironment = new Properties();
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class);
-        Flux<ApplicationConfig> task = refreshTaskUnderTest.createRefreshTask();
-
-        StepVerifier.create(task).expectSubscription().verifyComplete();
-
-        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR);
-        assertThat(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")).isTrue();
-    }
-
     @Test
     public void whenPeriodicConfigRefreshNoConsul_thenErrorIsLogged() {
-        refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+        refreshTaskUnderTest = this.createTestObject(false);
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
         EnvProperties props = properties();
         doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any());
 
         doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props);
-        Flux<JsonObject> err = Flux.error(new IOException());
+        Flux<?> err = Flux.error(new IOException());
         doReturn(err).when(cbsClient).updates(any(), any(), any());
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class);
-        Flux<ApplicationConfig> task = refreshTaskUnderTest.createRefreshTask();
+        Flux<?> task = refreshTaskUnderTest.createRefreshTask();
 
         StepVerifier //
             .create(task) //
             .expectSubscription() //
-            .verifyComplete();
+            .expectNoEvent(Duration.ofMillis(100)) //
+            .thenCancel() //
+            .verify();
 
-        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR);
         assertThat(
             logAppender.list.toString().contains("Could not refresh application configuration. java.io.IOException"))
                 .isTrue();
@@ -168,7 +178,7 @@ public class RefreshConfigTaskTest {
 
     @Test
     public void whenPeriodicConfigRefreshSuccess_thenNewConfigIsCreated() throws Exception {
-        refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+        refreshTaskUnderTest = this.createTestObject(false);
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
         EnvProperties props = properties();
@@ -187,8 +197,11 @@ public class RefreshConfigTaskTest {
             .create(task) //
             .expectSubscription() //
             .expectNext(appConfig) //
-            .verifyComplete();
+            .expectNext(appConfig) //
+            .thenCancel() //
+            .verify();
 
+        verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any());
         assertThat(appConfig.getRicConfigs()).isNotNull();
         assertThat(appConfig.getRic(RIC_1_NAME).baseUrl()).isEqualTo(newBaseUrl);
     }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
deleted file mode 100644 (file)
index ed4b7ee..0000000
+++ /dev/null
@@ -1,159 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.tasks;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.configuration.ImmutableRicConfig;
-import org.oransc.policyagent.configuration.RicConfig;
-import org.oransc.policyagent.repository.Policies;
-import org.oransc.policyagent.repository.Policy;
-import org.oransc.policyagent.repository.PolicyType;
-import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Rics;
-
-@ExtendWith(MockitoExtension.class)
-public class StartupServiceTest {
-    private static final String FIRST_RIC_NAME = "first";
-    private static final String SECOND_RIC_NAME = "second";
-
-    @Mock
-    ApplicationConfig appConfigMock;
-    @Mock
-    RefreshConfigTask refreshTaskMock;
-    @Mock
-    RicSynchronizationTask synchronizationTaskMock;
-
-    @Test
-    public void startup_thenServiceIsAddedAsObeserverAndRefreshIsStarted() {
-        StartupService serviceUnderTest =
-            new StartupService(appConfigMock, refreshTaskMock, null, null, null, null, null);
-
-        serviceUnderTest.startup();
-
-        verify(appConfigMock).addObserver(serviceUnderTest);
-        verify(refreshTaskMock).start();
-    }
-
-    @Test
-    public void twoNewRicsAddedToConfiguration_thenSynchronizationIsStartedAndTwoRicsAreAddedInRepository() {
-
-        Rics rics = new Rics();
-        StartupService serviceUnderTest =
-            spy(new StartupService(appConfigMock, refreshTaskMock, rics, null, null, null, null));
-
-        doReturn(synchronizationTaskMock).when(serviceUnderTest).createSynchronizationTask();
-
-        serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME), ApplicationConfig.RicConfigUpdate.ADDED);
-        serviceUnderTest.onRicConfigUpdate(getRicConfig(SECOND_RIC_NAME), ApplicationConfig.RicConfigUpdate.ADDED);
-
-        Ric firstRic = rics.get(FIRST_RIC_NAME);
-        assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
-        verify(synchronizationTaskMock, times(1)).run(firstRic);
-
-        Ric secondRic = rics.get(SECOND_RIC_NAME);
-        assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics");
-        verify(synchronizationTaskMock).run(secondRic);
-    }
-
-    @Test
-    public void oneRicIsChanged_thenSynchronizationIsStartedAndRicIsUpdatedInRepository() {
-        Rics rics = new Rics();
-        Ric originalRic = new Ric(getRicConfig(FIRST_RIC_NAME, "managedElement1"));
-        rics.put(originalRic);
-
-        StartupService serviceUnderTest =
-            spy(new StartupService(appConfigMock, refreshTaskMock, rics, null, null, null, null));
-
-        doReturn(synchronizationTaskMock).when(serviceUnderTest).createSynchronizationTask();
-
-        String updatedManagedElementName = "managedElement2";
-        serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, updatedManagedElementName),
-            ApplicationConfig.RicConfigUpdate.CHANGED);
-
-        Ric firstRic = rics.get(FIRST_RIC_NAME);
-        assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
-        assertTrue(firstRic.getManagedElementIds().contains(updatedManagedElementName), "Ric not updated");
-        verify(synchronizationTaskMock).run(firstRic);
-    }
-
-    @Test
-    public void oneRicIsRemoved_thenNoSynchronizationIsStartedAndRicAndItsPoliciesAreDeletedFromRepository() {
-        Rics rics = new Rics();
-        RicConfig ricConfig = getRicConfig(FIRST_RIC_NAME);
-        Ric ric = new Ric(ricConfig);
-        rics.put(ric);
-
-        Policies policies = addPolicyForRic(ric);
-
-        StartupService serviceUnderTest =
-            new StartupService(appConfigMock, refreshTaskMock, rics, null, null, policies, null);
-
-        serviceUnderTest.onRicConfigUpdate(ricConfig, ApplicationConfig.RicConfigUpdate.REMOVED);
-
-        assertEquals(0, rics.size(), "Ric not deleted");
-        assertEquals(0, policies.size(), "Ric's policies not deleted");
-    }
-
-    private Policies addPolicyForRic(Ric ric) {
-        Policies policies = new Policies();
-        Policy policyMock = mock(Policy.class);
-        when(policyMock.id()).thenReturn("policyId");
-        when(policyMock.ric()).thenReturn(ric);
-        PolicyType policyTypeMock = mock(PolicyType.class);
-        when(policyTypeMock.name()).thenReturn("typeName");
-        when(policyMock.type()).thenReturn(policyTypeMock);
-        policies.put(policyMock);
-        return policies;
-    }
-
-    private RicConfig getRicConfig(String name) {
-        return getRicConfig(name, null);
-    }
-
-    private RicConfig getRicConfig(String name, String managedElementName) {
-        List<String> managedElements = Collections.emptyList();
-        if (managedElementName != null) {
-            managedElements = Collections.singletonList(managedElementName);
-        }
-        ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
-            .name(name) //
-            .managedElementIds(managedElements) //
-            .baseUrl("baseUrl") //
-            .build();
-        return ricConfig;
-    }
-}