From 3e8bccd59c63f424052fcef5930e94a6629a1a95 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 6 Mar 2020 08:33:57 +0100 Subject: [PATCH] Simplified startup Simplified configuration handling. Fixed sonar issues. Change-Id: I34873e3a8e8a276df9316cae3750f9090d3ba3cb Issue-ID: NONRTRIC-149 Signed-off-by: PatrikBuhr --- .../java/org/oransc/policyagent/Application.java | 6 +- .../policyagent/clients/SdncOnapAdapterInput.java | 2 +- .../policyagent/clients/SdncOscAdapterInput.java | 2 +- .../configuration/ApplicationConfig.java | 96 ++++++------- .../policyagent/dmaap/DmaapMessageHandler.java | 7 +- .../org/oransc/policyagent/repository/Lock.java | 17 ++- .../org/oransc/policyagent/repository/Ric.java | 14 +- .../policyagent/tasks/RefreshConfigTask.java | 125 ++++++++++++---- .../oransc/policyagent/tasks/StartupService.java | 116 --------------- .../configuration/ApplicationConfigTest.java | 59 +++----- .../policyagent/tasks/RefreshConfigTaskTest.java | 65 +++++---- .../policyagent/tasks/StartupServiceTest.java | 159 --------------------- 12 files changed, 224 insertions(+), 444 deletions(-) delete mode 100644 policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java delete mode 100644 policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java diff --git a/policy-agent/src/main/java/org/oransc/policyagent/Application.java b/policy-agent/src/main/java/org/oransc/policyagent/Application.java index 18120e5b..1c397fe9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/Application.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/Application.java @@ -20,7 +20,7 @@ package org.oransc.policyagent; -import org.oransc.policyagent.tasks.StartupService; +import org.oransc.policyagent.tasks.RefreshConfigTask; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -32,7 +32,7 @@ import org.springframework.context.annotation.Bean; public class Application { @Autowired - private StartupService startupService; + private RefreshConfigTask configRefresh; public static void main(String[] args) { SpringApplication.run(Application.class); @@ -47,6 +47,6 @@ public class Application { */ @Bean public CommandLineRunner commandLineRunner(ApplicationContext ctx) { - return args -> startupService.startup(); + return args -> configRefresh.start(); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapAdapterInput.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapAdapterInput.java index f43fb80b..eb85de92 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapAdapterInput.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapAdapterInput.java @@ -28,7 +28,7 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters -public interface SdncOnapAdapterInput { +interface SdncOnapAdapterInput { public String nearRtRicId(); public Optional policyTypeId(); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscAdapterInput.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscAdapterInput.java index 9a67f44d..5fbfd3a7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscAdapterInput.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscAdapterInput.java @@ -27,7 +27,7 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters -public interface SdncOscAdapterInput { +interface SdncOscAdapterInput { public String nearRtRicUrl(); public Optional policyTypeId(); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index deacb446..3b4f8104 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -34,6 +34,7 @@ import lombok.Getter; import org.oransc.policyagent.exceptions.ServiceException; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import reactor.core.publisher.Flux; @EnableConfigurationProperties @ConfigurationProperties("app") @@ -50,7 +51,6 @@ public class ApplicationConfig { @NotEmpty private String a1ControllerPassword; - private Collection observers = new ArrayList<>(); private Map ricConfigs = new HashMap<>(); @Getter private Properties dmaapPublisherConfig; @@ -61,15 +61,15 @@ public class ApplicationConfig { return this.filepath; } - public String getA1ControllerBaseUrl() { + public synchronized String getA1ControllerBaseUrl() { return this.a1ControllerBaseUrl; } - public String getA1ControllerUsername() { + public synchronized String getA1ControllerUsername() { return this.a1ControllerUsername; } - public String getA1ControllerPassword() { + public synchronized String getA1ControllerPassword() { return this.a1ControllerPassword; } @@ -105,65 +105,49 @@ public class ApplicationConfig { throw new ServiceException("Could not find ric: " + ricName); } - public enum RicConfigUpdate { - ADDED, CHANGED, REMOVED - } - - public interface Observer { - void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event); - } - - public void addObserver(Observer o) { - this.observers.add(o); - } + public static class RicConfigUpdate { + public enum Type { + ADDED, CHANGED, REMOVED + } - private class Notification { - final RicConfig ric; - final RicConfigUpdate event; + @Getter + private final RicConfig ricConfig; + @Getter + private final Type type; - Notification(RicConfig ric, RicConfigUpdate event) { - this.ric = ric; - this.event = event; + RicConfigUpdate(RicConfig ric, Type event) { + this.ricConfig = ric; + this.type = event; } } - public void setConfiguration(@NotNull Collection ricConfigs, Properties dmaapPublisherConfig, - Properties dmaapConsumerConfig) { - - Collection notifications = new ArrayList<>(); - synchronized (this) { - this.dmaapPublisherConfig = dmaapPublisherConfig; - this.dmaapConsumerConfig = dmaapConsumerConfig; - - Map newRicConfigs = new HashMap<>(); - for (RicConfig newConfig : ricConfigs) { - RicConfig oldConfig = this.ricConfigs.get(newConfig.name()); - if (oldConfig == null) { - newRicConfigs.put(newConfig.name(), newConfig); - notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED)); - this.ricConfigs.remove(newConfig.name()); - } else if (!newConfig.equals(oldConfig)) { - notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED)); - newRicConfigs.put(newConfig.name(), newConfig); - this.ricConfigs.remove(newConfig.name()); - } else { - newRicConfigs.put(oldConfig.name(), oldConfig); - } + public synchronized Flux setConfiguration(@NotNull Collection ricConfigs, + Properties dmaapPublisherConfig, Properties dmaapConsumerConfig) { + + Collection modifications = new ArrayList<>(); + this.dmaapPublisherConfig = dmaapPublisherConfig; + this.dmaapConsumerConfig = dmaapConsumerConfig; + + Map newRicConfigs = new HashMap<>(); + for (RicConfig newConfig : ricConfigs) { + RicConfig oldConfig = this.ricConfigs.get(newConfig.name()); + if (oldConfig == null) { + newRicConfigs.put(newConfig.name(), newConfig); + modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.ADDED)); + this.ricConfigs.remove(newConfig.name()); + } else if (!newConfig.equals(oldConfig)) { + modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.CHANGED)); + newRicConfigs.put(newConfig.name(), newConfig); + this.ricConfigs.remove(newConfig.name()); + } else { + newRicConfigs.put(oldConfig.name(), oldConfig); } - for (RicConfig deletedConfig : this.ricConfigs.values()) { - notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED)); - } - this.ricConfigs = newRicConfigs; } - - notifyObservers(notifications); - } - - private void notifyObservers(Collection notifications) { - for (Observer observer : this.observers) { - for (Notification notif : notifications) { - observer.onRicConfigUpdate(notif.ric, notif.event); - } + for (RicConfig deletedConfig : this.ricConfigs.values()) { + modifications.add(new RicConfigUpdate(deletedConfig, RicConfigUpdate.Type.REMOVED)); } + this.ricConfigs = newRicConfigs; + + return Flux.fromIterable(modifications); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index b23595a9..cbe38002 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -22,8 +22,10 @@ package org.oransc.policyagent.dmaap; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; import java.io.IOException; +import java.util.Optional; import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.oransc.policyagent.clients.AsyncRestClient; @@ -94,8 +96,9 @@ public class DmaapMessageHandler { } private String payload(DmaapRequestMessage message) { - if (message.payload().isPresent()) { - return gson.toJson(message.payload().get()); + Optional payload = message.payload(); + if (payload.isPresent()) { + return gson.toJson(payload.get()); } else { logger.warn("Expected payload in message from DMAAP: {}", message); return ""; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java index bc5d77a3..ff3f3310 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java @@ -42,6 +42,7 @@ public class Lock { private boolean isExclusive = false; private int lockCounter = 0; private final List lockRequestQueue = new LinkedList<>(); + private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); private static class AsynchCallbackExecutor implements Runnable { private List lockRequestQueue = new LinkedList<>(); @@ -72,6 +73,7 @@ public class Lock { return q; } + @SuppressWarnings("java:S2274") private synchronized void waitForNewEntries() { try { if (this.lockRequestQueue.isEmpty()) { @@ -79,13 +81,12 @@ public class Lock { } } catch (InterruptedException e) { logger.warn("waitForUnlock interrupted", e); + Thread.currentThread().interrupt(); } } } - private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); - - public static enum LockType { + public enum LockType { EXCLUSIVE, SHARED } @@ -116,7 +117,7 @@ public class Lock { synchronized (this) { if (lockCounter <= 0) { lockCounter = -1; // Might as well stop, to make it easier to find the problem - throw new RuntimeException("Number of unlocks must match the number of locks"); + throw new NullPointerException("Number of unlocks must match the number of locks"); } this.lockCounter--; if (lockCounter == 0) { @@ -148,10 +149,6 @@ public class Lock { } } } - - /* - * for (LockRequest request : granted) { request.callback.success(this); } - */ callbackProcessor.addAll(granted); } @@ -171,11 +168,13 @@ public class Lock { lockRequestQueue.add(new LockRequest(callback, lockType, this)); } - private void waitForUnlock() { + @SuppressWarnings("java:S2274") + private synchronized void waitForUnlock() { try { this.wait(); } catch (InterruptedException e) { logger.warn("waitForUnlock interrupted", e); + Thread.currentThread().interrupt(); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java index a5de890e..0242b90f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java @@ -20,8 +20,6 @@ package org.oransc.policyagent.repository; -import com.google.common.collect.ImmutableList; - import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -38,9 +36,8 @@ import org.oransc.policyagent.configuration.RicConfig; */ public class Ric { - private final RicConfig ricConfig; - private final ImmutableList managedElementIds; - + @Setter + private RicConfig ricConfig; private RicState state = RicState.UNDEFINED; private Map supportedPolicyTypes = new HashMap<>(); @Getter @@ -57,7 +54,6 @@ public class Ric { */ public Ric(RicConfig ricConfig) { this.ricConfig = ricConfig; - this.managedElementIds = ricConfig.managedElementIds(); } public String name() { @@ -82,7 +78,7 @@ public class Ric { * @return a vector containing the nodes managed by this Ric. */ public synchronized Collection getManagedElementIds() { - return managedElementIds; + return ricConfig.managedElementIds(); } /** @@ -92,7 +88,7 @@ public class Ric { * @return true if the given node is managed by this Ric. */ public synchronized boolean isManaging(String managedElementId) { - return managedElementIds.contains(managedElementId); + return ricConfig.managedElementIds().contains(managedElementId); } /** @@ -138,7 +134,7 @@ public class Ric { @Override public synchronized String toString() { return Ric.class.getSimpleName() + ": " + "name: " + name() + ", state: " + state + ", baseUrl: " - + ricConfig.baseUrl() + ", managedNodes: " + managedElementIds; + + ricConfig.baseUrl() + ", managedNodes: " + ricConfig.managedElementIds(); } /** diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 4080b37c..81735d72 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -44,9 +45,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; +import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate; import org.oransc.policyagent.configuration.ApplicationConfigParser; +import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; +import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -68,22 +77,36 @@ public class RefreshConfigTask { @Value("#{systemEnvironment}") public Properties systemEnvironment; - private final ApplicationConfig appConfig; + final ApplicationConfig appConfig; private Disposable refreshTask = null; + private boolean isConsulUsed = false; + + private final Rics rics; + private final A1ClientFactory a1ClientFactory; + private final Policies policies; + private final Services services; + private final PolicyTypes policyTypes; + private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); + private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); @Autowired - public RefreshConfigTask(ApplicationConfig appConfig) { + public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services, + PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) { this.appConfig = appConfig; + this.rics = rics; + this.policies = policies; + this.services = services; + this.policyTypes = policyTypes; + this.a1ClientFactory = a1ClientFactory; } public void start() { logger.debug("Starting refreshConfigTask"); stop(); - loadConfigurationFromFile(); refreshTask = createRefreshTask() // .subscribe(notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), - () -> logger.debug("Configuration refresh completed")); + () -> logger.error("Configuration refresh terminated")); } public void stop() { @@ -94,11 +117,28 @@ public class RefreshConfigTask { } Flux createRefreshTask() { - return getEnvironment(systemEnvironment) // + Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // + .filter(notUsed -> configFileExists()) // + .filter(notUsed -> !this.isConsulUsed) // + .flatMap(notUsed -> loadConfigurationFromFile()) // + .onErrorResume(this::ignoreError) // + .doOnNext(json -> logger.debug("loadFromFile")) // + .doOnTerminate(() -> logger.error("loadFromFile Terminate")); + + Flux loadFromConsul = getEnvironment(systemEnvironment) // .flatMap(this::createCbsClient) // .flatMapMany(this::periodicConfigurationUpdates) // - .map(this::parseRicConfigurationfromConsul) // - .onErrorResume(this::onErrorResume); + .onErrorResume(this::ignoreError) // + .doOnNext(json -> logger.debug("loadFromConsul")) // + .doOnNext(json -> this.isConsulUsed = true) // + .doOnTerminate(() -> logger.error("loadFromConsul Terminate")); + + return Flux.merge(loadFromFile, loadFromConsul) // + .flatMap(this::parseConfiguration) // + .flatMap(this::updateConfig) // + .doOnNext(this::handleUpdatedRicConfig) // + .flatMap(configUpdate -> Flux.just(this.appConfig)) // + .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } Mono getEnvironment(Properties systemEnvironment) { @@ -111,38 +151,75 @@ public class RefreshConfigTask { private Flux periodicConfigurationUpdates(CbsClient cbsClient) { final Duration initialDelay = Duration.ZERO; - final Duration refreshPeriod = Duration.ofMinutes(1); final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); + return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL); } - private Mono onErrorResume(Throwable throwable) { + private Mono ignoreError(Throwable throwable) { String errMsg = throwable.toString(); - logger.error("Could not refresh application configuration. {}", errMsg); + logger.warn("Could not refresh application configuration. {}", errMsg); return Mono.empty(); } - private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) { + private Mono parseConfiguration(JsonObject jsonObject) { try { ApplicationConfigParser parser = new ApplicationConfigParser(); parser.parse(jsonObject); - this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(), - parser.getDmaapConsumerConfig()); + return Mono.just(parser); } catch (ServiceException e) { logger.error("Could not parse configuration {}", e.toString(), e); + return Mono.error(e); + } + } + + private Flux updateConfig(ApplicationConfigParser config) { + return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(), + config.getDmaapConsumerConfig()); + } + + boolean configFileExists() { + String filepath = appConfig.getLocalConfigurationFilePath(); + return (filepath != null && (new File(filepath).exists())); + } + + private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) { + synchronized (this.rics) { + String ricName = updatedInfo.getRicConfig().name(); + RicConfigUpdate.Type event = updatedInfo.getType(); + if (event == RicConfigUpdate.Type.ADDED) { + addRic(updatedInfo.getRicConfig()); + } else if (event == RicConfigUpdate.Type.REMOVED) { + rics.remove(ricName); + this.policies.removePoliciesForRic(ricName); + } else if (event == RicConfigUpdate.Type.CHANGED) { + Ric ric = this.rics.get(ricName); + if (ric == null) { + // Should not happend,just for robustness + addRic(updatedInfo.getRicConfig()); + } else { + ric.setRicConfig(updatedInfo.getRicConfig()); + } + } } - return this.appConfig; + } + + private void addRic(RicConfig config) { + Ric ric = new Ric(config); + this.rics.put(ric); + runRicSynchronization(ric); + } + + void runRicSynchronization(Ric ric) { + RicSynchronizationTask synchronizationTask = + new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); + synchronizationTask.run(ric); } /** * Reads the configuration from file. */ - void loadConfigurationFromFile() { + Flux loadConfigurationFromFile() { String filepath = appConfig.getLocalConfigurationFilePath(); - if (filepath == null) { - logger.debug("No localconfiguration file used"); - return; - } GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); @@ -150,11 +227,11 @@ public class RefreshConfigTask { JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(), - appParser.getDmaapConsumerConfig()); - logger.info("Local configuration file loaded: {}", filepath); + logger.debug("Local configuration file loaded: {}", filepath); + return Flux.just(rootObject); } catch (JsonSyntaxException | ServiceException | IOException e) { - logger.trace("Local configuration file not loaded: {}", filepath, e); + logger.debug("Local configuration file not loaded: {}", filepath, e); + return Flux.empty(); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java deleted file mode 100644 index 590b0107..00000000 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ /dev/null @@ -1,116 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * O-RAN-SC - * %% - * Copyright (C) 2019 Nordix Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.oransc.policyagent.tasks; - -import org.oransc.policyagent.clients.A1ClientFactory; -import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate; -import org.oransc.policyagent.configuration.RicConfig; -import org.oransc.policyagent.repository.Policies; -import org.oransc.policyagent.repository.PolicyTypes; -import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Rics; -import org.oransc.policyagent.repository.Services; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; -import org.springframework.stereotype.Service; - -/** - * Loads information about RealTime-RICs at startup. - */ -@Service("startupService") -@Order(Ordered.HIGHEST_PRECEDENCE) -public class StartupService implements ApplicationConfig.Observer { - - private static final Logger logger = LoggerFactory.getLogger(StartupService.class); - - @Autowired - ApplicationConfig applicationConfig; - - @Autowired - RefreshConfigTask refreshConfigTask; - - @Autowired - private Rics rics; - - @Autowired - PolicyTypes policyTypes; - - @Autowired - private A1ClientFactory a1ClientFactory; - - @Autowired - private Policies policies; - - @Autowired - private Services services; - - // Only for unit testing - StartupService(ApplicationConfig appConfig, RefreshConfigTask refreshTask, Rics rics, PolicyTypes policyTypes, - A1ClientFactory a1ClientFactory, Policies policies, Services services) { - this.applicationConfig = appConfig; - this.refreshConfigTask = refreshTask; - this.rics = rics; - this.policyTypes = policyTypes; - this.a1ClientFactory = a1ClientFactory; - this.policies = policies; - this.services = services; - } - - @Override - public void onRicConfigUpdate(RicConfig ricConfig, RicConfigUpdate event) { - synchronized (this.rics) { - switch (event) { - case ADDED: - case CHANGED: - Ric ric = new Ric(ricConfig); - rics.put(ric); - RicSynchronizationTask synchronizationTask = createSynchronizationTask(); - synchronizationTask.run(ric); - break; - - case REMOVED: - rics.remove(ricConfig.name()); - policies.removePoliciesForRic(ricConfig.name()); - break; - - default: - logger.error("Unhandled ric event: {}", event); - } - } - } - - /** - * Reads the configured Rics and performs the service discovery. The result is put into the repository. - */ - public void startup() { - logger.debug("Starting up"); - applicationConfig.addObserver(this); - refreshConfigTask.start(); - } - - RicSynchronizationTask createSynchronizationTask() { - return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); - } -} diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java index b03cdc02..17f09974 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java @@ -23,25 +23,18 @@ package org.oransc.policyagent.configuration; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.Vector; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.oransc.policyagent.configuration.ApplicationConfig.Observer; +import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate; import org.oransc.policyagent.exceptions.ServiceException; @ExtendWith(MockitoExtension.class) public class ApplicationConfigTest { - @Mock - Observer observerMock1; - - @Mock - Observer observerMock2; private static final ImmutableRicConfig RIC_CONFIG_1 = ImmutableRicConfig.builder() // .name("ric1") // @@ -50,17 +43,25 @@ public class ApplicationConfigTest { .build(); @Test - public void addRicShouldNotifyAllObserversOfRicAdded() throws Exception { + public void gettingNotAddedRicShouldThrowException() { ApplicationConfig appConfigUnderTest = new ApplicationConfig(); - appConfigUnderTest.addObserver(observerMock1); - appConfigUnderTest.addObserver(observerMock2); - appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null); - verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED); - verify(observerMock2).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED); + Exception exception = assertThrows(ServiceException.class, () -> { + appConfigUnderTest.getRic("name"); + }); + + assertEquals("Could not find ric: name", exception.getMessage()); + } + + @Test + public void addRicShouldNotifyAllObserversOfRicAdded() throws Exception { + ApplicationConfig appConfigUnderTest = new ApplicationConfig(); + RicConfigUpdate update = + appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null).blockFirst(); + assertEquals(RicConfigUpdate.Type.ADDED, update.getType()); assertTrue(appConfigUnderTest.getRicConfigs().contains(RIC_CONFIG_1), "Ric not added to configuraions."); assertEquals(RIC_CONFIG_1, appConfigUnderTest.getRic(RIC_CONFIG_1.name()), @@ -71,8 +72,6 @@ public class ApplicationConfigTest { public void changedRicShouldNotifyAllObserversOfRicChanged() throws Exception { ApplicationConfig appConfigUnderTest = new ApplicationConfig(); - appConfigUnderTest.addObserver(observerMock1); - appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null); ImmutableRicConfig changedRicConfig = ImmutableRicConfig.builder() // @@ -81,11 +80,10 @@ public class ApplicationConfigTest { .managedElementIds(new Vector<>()) // .build(); - appConfigUnderTest.setConfiguration(Arrays.asList(changedRicConfig), null, null); - - verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED); - verify(observerMock1).onRicConfigUpdate(changedRicConfig, ApplicationConfig.RicConfigUpdate.CHANGED); + RicConfigUpdate update = + appConfigUnderTest.setConfiguration(Arrays.asList(changedRicConfig), null, null).blockFirst(); + assertEquals(RicConfigUpdate.Type.CHANGED, update.getType()); assertEquals(changedRicConfig, appConfigUnderTest.getRic(RIC_CONFIG_1.name()), "Changed Ric not retrieved from configurations."); } @@ -94,8 +92,6 @@ public class ApplicationConfigTest { public void removedRicShouldNotifyAllObserversOfRicRemoved() { ApplicationConfig appConfigUnderTest = new ApplicationConfig(); - appConfigUnderTest.addObserver(observerMock1); - ImmutableRicConfig ricConfig2 = ImmutableRicConfig.builder() // .name("ric2") // .baseUrl("ric2_url") // @@ -104,24 +100,11 @@ public class ApplicationConfigTest { appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1, ricConfig2), null, null); - appConfigUnderTest.setConfiguration(Arrays.asList(ricConfig2), null, null); - - verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.REMOVED); + RicConfigUpdate update = + appConfigUnderTest.setConfiguration(Arrays.asList(ricConfig2), null, null).blockFirst(); + assertEquals(RicConfigUpdate.Type.REMOVED, update.getType()); assertEquals(1, appConfigUnderTest.getRicConfigs().size(), "Ric not deleted from configurations."); } - @Test - public void gettingNotAddedRicShouldThrowException() { - ApplicationConfig appConfigUnderTest = new ApplicationConfig(); - - appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null); - - Exception exception = assertThrows(ServiceException.class, () -> { - appConfigUnderTest.getRic("name"); - }); - - assertEquals("Could not find ric: name", exception.getMessage()); - } - } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java index 7b54e068..6073b26a 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; @@ -44,6 +43,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.Vector; @@ -56,10 +56,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ApplicationConfigParser; import org.oransc.policyagent.configuration.ImmutableRicConfig; import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; +import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.oransc.policyagent.utils.LoggingUtils; import reactor.core.publisher.Flux; @@ -93,14 +98,27 @@ public class RefreshConfigTaskTest { .build(); } + private RefreshConfigTask createTestObject(boolean configFileExists) { + RefreshConfigTask obj = spy(new RefreshConfigTask(appConfig, new Rics(), new Policies(), new Services(), + new PolicyTypes(), new A1ClientFactory(appConfig))); + doReturn(configFileExists).when(obj).configFileExists(); + return obj; + } + @Test public void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception { - refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig)); + refreshTaskUnderTest = this.createTestObject(true); refreshTaskUnderTest.systemEnvironment = new Properties(); // When doReturn(getCorrectJson()).when(refreshTaskUnderTest).createInputStream(any()); doReturn("fileName").when(appConfig).getLocalConfigurationFilePath(); - refreshTaskUnderTest.start(); + + StepVerifier.create(refreshTaskUnderTest.createRefreshTask()) // + .expectSubscription() // + .expectNext(this.appConfig) // + .expectNext(this.appConfig) // + .thenCancel() // + .verify(); // Then verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile(); @@ -113,54 +131,46 @@ public class RefreshConfigTaskTest { @Test public void whenFileExistsButJsonIsIncorrect_thenNoRicsArePutInRepository() throws Exception { - refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig)); + refreshTaskUnderTest = this.createTestObject(true); refreshTaskUnderTest.systemEnvironment = new Properties(); // When doReturn(getIncorrectJson()).when(refreshTaskUnderTest).createInputStream(any()); doReturn("fileName").when(appConfig).getLocalConfigurationFilePath(); - refreshTaskUnderTest.loadConfigurationFromFile(); + + StepVerifier.create(refreshTaskUnderTest.createRefreshTask()) // + .expectSubscription() // + .expectNoEvent(Duration.ofMillis(100)) // + .thenCancel() // + .verify(); // Then verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile(); assertThat(appConfig.getRicConfigs().size()).isEqualTo(0); } - @Test - public void whenPeriodicConfigRefreshNoEnvironmentVariables_thenErrorIsLogged() { - refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig)); - refreshTaskUnderTest.systemEnvironment = new Properties(); - - final ListAppender logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class); - Flux task = refreshTaskUnderTest.createRefreshTask(); - - StepVerifier.create(task).expectSubscription().verifyComplete(); - - assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR); - assertThat(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")).isTrue(); - } - @Test public void whenPeriodicConfigRefreshNoConsul_thenErrorIsLogged() { - refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig)); + refreshTaskUnderTest = this.createTestObject(false); refreshTaskUnderTest.systemEnvironment = new Properties(); EnvProperties props = properties(); doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any()); doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props); - Flux err = Flux.error(new IOException()); + Flux err = Flux.error(new IOException()); doReturn(err).when(cbsClient).updates(any(), any(), any()); final ListAppender logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class); - Flux task = refreshTaskUnderTest.createRefreshTask(); + Flux task = refreshTaskUnderTest.createRefreshTask(); StepVerifier // .create(task) // .expectSubscription() // - .verifyComplete(); + .expectNoEvent(Duration.ofMillis(100)) // + .thenCancel() // + .verify(); - assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR); assertThat( logAppender.list.toString().contains("Could not refresh application configuration. java.io.IOException")) .isTrue(); @@ -168,7 +178,7 @@ public class RefreshConfigTaskTest { @Test public void whenPeriodicConfigRefreshSuccess_thenNewConfigIsCreated() throws Exception { - refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig)); + refreshTaskUnderTest = this.createTestObject(false); refreshTaskUnderTest.systemEnvironment = new Properties(); EnvProperties props = properties(); @@ -187,8 +197,11 @@ public class RefreshConfigTaskTest { .create(task) // .expectSubscription() // .expectNext(appConfig) // - .verifyComplete(); + .expectNext(appConfig) // + .thenCancel() // + .verify(); + verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any()); assertThat(appConfig.getRicConfigs()).isNotNull(); assertThat(appConfig.getRic(RIC_1_NAME).baseUrl()).isEqualTo(newBaseUrl); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java deleted file mode 100644 index ed4b7ee9..00000000 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * O-RAN-SC - * %% - * Copyright (C) 2019 Nordix Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.oransc.policyagent.tasks; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.List; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.configuration.ImmutableRicConfig; -import org.oransc.policyagent.configuration.RicConfig; -import org.oransc.policyagent.repository.Policies; -import org.oransc.policyagent.repository.Policy; -import org.oransc.policyagent.repository.PolicyType; -import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Rics; - -@ExtendWith(MockitoExtension.class) -public class StartupServiceTest { - private static final String FIRST_RIC_NAME = "first"; - private static final String SECOND_RIC_NAME = "second"; - - @Mock - ApplicationConfig appConfigMock; - @Mock - RefreshConfigTask refreshTaskMock; - @Mock - RicSynchronizationTask synchronizationTaskMock; - - @Test - public void startup_thenServiceIsAddedAsObeserverAndRefreshIsStarted() { - StartupService serviceUnderTest = - new StartupService(appConfigMock, refreshTaskMock, null, null, null, null, null); - - serviceUnderTest.startup(); - - verify(appConfigMock).addObserver(serviceUnderTest); - verify(refreshTaskMock).start(); - } - - @Test - public void twoNewRicsAddedToConfiguration_thenSynchronizationIsStartedAndTwoRicsAreAddedInRepository() { - - Rics rics = new Rics(); - StartupService serviceUnderTest = - spy(new StartupService(appConfigMock, refreshTaskMock, rics, null, null, null, null)); - - doReturn(synchronizationTaskMock).when(serviceUnderTest).createSynchronizationTask(); - - serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME), ApplicationConfig.RicConfigUpdate.ADDED); - serviceUnderTest.onRicConfigUpdate(getRicConfig(SECOND_RIC_NAME), ApplicationConfig.RicConfigUpdate.ADDED); - - Ric firstRic = rics.get(FIRST_RIC_NAME); - assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics"); - verify(synchronizationTaskMock, times(1)).run(firstRic); - - Ric secondRic = rics.get(SECOND_RIC_NAME); - assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics"); - verify(synchronizationTaskMock).run(secondRic); - } - - @Test - public void oneRicIsChanged_thenSynchronizationIsStartedAndRicIsUpdatedInRepository() { - Rics rics = new Rics(); - Ric originalRic = new Ric(getRicConfig(FIRST_RIC_NAME, "managedElement1")); - rics.put(originalRic); - - StartupService serviceUnderTest = - spy(new StartupService(appConfigMock, refreshTaskMock, rics, null, null, null, null)); - - doReturn(synchronizationTaskMock).when(serviceUnderTest).createSynchronizationTask(); - - String updatedManagedElementName = "managedElement2"; - serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, updatedManagedElementName), - ApplicationConfig.RicConfigUpdate.CHANGED); - - Ric firstRic = rics.get(FIRST_RIC_NAME); - assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics"); - assertTrue(firstRic.getManagedElementIds().contains(updatedManagedElementName), "Ric not updated"); - verify(synchronizationTaskMock).run(firstRic); - } - - @Test - public void oneRicIsRemoved_thenNoSynchronizationIsStartedAndRicAndItsPoliciesAreDeletedFromRepository() { - Rics rics = new Rics(); - RicConfig ricConfig = getRicConfig(FIRST_RIC_NAME); - Ric ric = new Ric(ricConfig); - rics.put(ric); - - Policies policies = addPolicyForRic(ric); - - StartupService serviceUnderTest = - new StartupService(appConfigMock, refreshTaskMock, rics, null, null, policies, null); - - serviceUnderTest.onRicConfigUpdate(ricConfig, ApplicationConfig.RicConfigUpdate.REMOVED); - - assertEquals(0, rics.size(), "Ric not deleted"); - assertEquals(0, policies.size(), "Ric's policies not deleted"); - } - - private Policies addPolicyForRic(Ric ric) { - Policies policies = new Policies(); - Policy policyMock = mock(Policy.class); - when(policyMock.id()).thenReturn("policyId"); - when(policyMock.ric()).thenReturn(ric); - PolicyType policyTypeMock = mock(PolicyType.class); - when(policyTypeMock.name()).thenReturn("typeName"); - when(policyMock.type()).thenReturn(policyTypeMock); - policies.put(policyMock); - return policies; - } - - private RicConfig getRicConfig(String name) { - return getRicConfig(name, null); - } - - private RicConfig getRicConfig(String name, String managedElementName) { - List managedElements = Collections.emptyList(); - if (managedElementName != null) { - managedElements = Collections.singletonList(managedElementName); - } - ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() // - .name(name) // - .managedElementIds(managedElements) // - .baseUrl("baseUrl") // - .build(); - return ricConfig; - } -} -- 2.16.6