Simplified configuration handling.
Fixed sonar issues.
Change-Id: I34873e3a8e8a276df9316cae3750f9090d3ba3cb
Issue-ID: NONRTRIC-149
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
package org.oransc.policyagent;
-import org.oransc.policyagent.tasks.StartupService;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
public class Application {
@Autowired
- private StartupService startupService;
+ private RefreshConfigTask configRefresh;
public static void main(String[] args) {
SpringApplication.run(Application.class);
*/
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
- return args -> startupService.startup();
+ return args -> configRefresh.start();
}
}
@Value.Immutable
@Gson.TypeAdapters
-public interface SdncOnapAdapterInput {
+interface SdncOnapAdapterInput {
public String nearRtRicId();
public Optional<String> policyTypeId();
@Value.Immutable
@Gson.TypeAdapters
-public interface SdncOscAdapterInput {
+interface SdncOscAdapterInput {
public String nearRtRicUrl();
public Optional<String> policyTypeId();
import org.oransc.policyagent.exceptions.ServiceException;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import reactor.core.publisher.Flux;
@EnableConfigurationProperties
@ConfigurationProperties("app")
@NotEmpty
private String a1ControllerPassword;
- private Collection<Observer> observers = new ArrayList<>();
private Map<String, RicConfig> ricConfigs = new HashMap<>();
@Getter
private Properties dmaapPublisherConfig;
return this.filepath;
}
- public String getA1ControllerBaseUrl() {
+ public synchronized String getA1ControllerBaseUrl() {
return this.a1ControllerBaseUrl;
}
- public String getA1ControllerUsername() {
+ public synchronized String getA1ControllerUsername() {
return this.a1ControllerUsername;
}
- public String getA1ControllerPassword() {
+ public synchronized String getA1ControllerPassword() {
return this.a1ControllerPassword;
}
throw new ServiceException("Could not find ric: " + ricName);
}
- public enum RicConfigUpdate {
- ADDED, CHANGED, REMOVED
- }
-
- public interface Observer {
- void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
- }
-
- public void addObserver(Observer o) {
- this.observers.add(o);
- }
+ public static class RicConfigUpdate {
+ public enum Type {
+ ADDED, CHANGED, REMOVED
+ }
- private class Notification {
- final RicConfig ric;
- final RicConfigUpdate event;
+ @Getter
+ private final RicConfig ricConfig;
+ @Getter
+ private final Type type;
- Notification(RicConfig ric, RicConfigUpdate event) {
- this.ric = ric;
- this.event = event;
+ RicConfigUpdate(RicConfig ric, Type event) {
+ this.ricConfig = ric;
+ this.type = event;
}
}
- public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapPublisherConfig,
- Properties dmaapConsumerConfig) {
-
- Collection<Notification> notifications = new ArrayList<>();
- synchronized (this) {
- this.dmaapPublisherConfig = dmaapPublisherConfig;
- this.dmaapConsumerConfig = dmaapConsumerConfig;
-
- Map<String, RicConfig> newRicConfigs = new HashMap<>();
- for (RicConfig newConfig : ricConfigs) {
- RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
- if (oldConfig == null) {
- newRicConfigs.put(newConfig.name(), newConfig);
- notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
- this.ricConfigs.remove(newConfig.name());
- } else if (!newConfig.equals(oldConfig)) {
- notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
- newRicConfigs.put(newConfig.name(), newConfig);
- this.ricConfigs.remove(newConfig.name());
- } else {
- newRicConfigs.put(oldConfig.name(), oldConfig);
- }
+ public synchronized Flux<RicConfigUpdate> setConfiguration(@NotNull Collection<RicConfig> ricConfigs,
+ Properties dmaapPublisherConfig, Properties dmaapConsumerConfig) {
+
+ Collection<RicConfigUpdate> modifications = new ArrayList<>();
+ this.dmaapPublisherConfig = dmaapPublisherConfig;
+ this.dmaapConsumerConfig = dmaapConsumerConfig;
+
+ Map<String, RicConfig> newRicConfigs = new HashMap<>();
+ for (RicConfig newConfig : ricConfigs) {
+ RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+ if (oldConfig == null) {
+ newRicConfigs.put(newConfig.name(), newConfig);
+ modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.ADDED));
+ this.ricConfigs.remove(newConfig.name());
+ } else if (!newConfig.equals(oldConfig)) {
+ modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.CHANGED));
+ newRicConfigs.put(newConfig.name(), newConfig);
+ this.ricConfigs.remove(newConfig.name());
+ } else {
+ newRicConfigs.put(oldConfig.name(), oldConfig);
}
- for (RicConfig deletedConfig : this.ricConfigs.values()) {
- notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
- }
- this.ricConfigs = newRicConfigs;
}
-
- notifyObservers(notifications);
- }
-
- private void notifyObservers(Collection<Notification> notifications) {
- for (Observer observer : this.observers) {
- for (Notification notif : notifications) {
- observer.onRicConfigUpdate(notif.ric, notif.event);
- }
+ for (RicConfig deletedConfig : this.ricConfigs.values()) {
+ modifications.add(new RicConfigUpdate(deletedConfig, RicConfigUpdate.Type.REMOVED));
}
+ this.ricConfigs = newRicConfigs;
+
+ return Flux.fromIterable(modifications);
}
}
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
import java.io.IOException;
+import java.util.Optional;
import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
}
private String payload(DmaapRequestMessage message) {
- if (message.payload().isPresent()) {
- return gson.toJson(message.payload().get());
+ Optional<JsonObject> payload = message.payload();
+ if (payload.isPresent()) {
+ return gson.toJson(payload.get());
} else {
logger.warn("Expected payload in message from DMAAP: {}", message);
return "";
private boolean isExclusive = false;
private int lockCounter = 0;
private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+ private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
private static class AsynchCallbackExecutor implements Runnable {
private List<LockRequest> lockRequestQueue = new LinkedList<>();
return q;
}
+ @SuppressWarnings("java:S2274")
private synchronized void waitForNewEntries() {
try {
if (this.lockRequestQueue.isEmpty()) {
}
} catch (InterruptedException e) {
logger.warn("waitForUnlock interrupted", e);
+ Thread.currentThread().interrupt();
}
}
}
- private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
-
- public static enum LockType {
+ public enum LockType {
EXCLUSIVE, SHARED
}
synchronized (this) {
if (lockCounter <= 0) {
lockCounter = -1; // Might as well stop, to make it easier to find the problem
- throw new RuntimeException("Number of unlocks must match the number of locks");
+ throw new NullPointerException("Number of unlocks must match the number of locks");
}
this.lockCounter--;
if (lockCounter == 0) {
}
}
}
-
- /*
- * for (LockRequest request : granted) { request.callback.success(this); }
- */
callbackProcessor.addAll(granted);
}
lockRequestQueue.add(new LockRequest(callback, lockType, this));
}
- private void waitForUnlock() {
+ @SuppressWarnings("java:S2274")
+ private synchronized void waitForUnlock() {
try {
this.wait();
} catch (InterruptedException e) {
logger.warn("waitForUnlock interrupted", e);
+ Thread.currentThread().interrupt();
}
}
package org.oransc.policyagent.repository;
-import com.google.common.collect.ImmutableList;
-
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
*/
public class Ric {
- private final RicConfig ricConfig;
- private final ImmutableList<String> managedElementIds;
-
+ @Setter
+ private RicConfig ricConfig;
private RicState state = RicState.UNDEFINED;
private Map<String, PolicyType> supportedPolicyTypes = new HashMap<>();
@Getter
*/
public Ric(RicConfig ricConfig) {
this.ricConfig = ricConfig;
- this.managedElementIds = ricConfig.managedElementIds();
}
public String name() {
* @return a vector containing the nodes managed by this Ric.
*/
public synchronized Collection<String> getManagedElementIds() {
- return managedElementIds;
+ return ricConfig.managedElementIds();
}
/**
* @return true if the given node is managed by this Ric.
*/
public synchronized boolean isManaging(String managedElementId) {
- return managedElementIds.contains(managedElementId);
+ return ricConfig.managedElementIds().contains(managedElementId);
}
/**
@Override
public synchronized String toString() {
return Ric.class.getSimpleName() + ": " + "name: " + name() + ", state: " + state + ", baseUrl: "
- + ricConfig.baseUrl() + ", managedNodes: " + managedElementIds;
+ + ricConfig.baseUrl() + ", managedNodes: " + ricConfig.managedElementIds();
}
/**
import com.google.gson.TypeAdapterFactory;
import java.io.BufferedInputStream;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
import org.oransc.policyagent.configuration.ApplicationConfigParser;
+import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Value("#{systemEnvironment}")
public Properties systemEnvironment;
- private final ApplicationConfig appConfig;
+ final ApplicationConfig appConfig;
private Disposable refreshTask = null;
+ private boolean isConsulUsed = false;
+
+ private final Rics rics;
+ private final A1ClientFactory a1ClientFactory;
+ private final Policies policies;
+ private final Services services;
+ private final PolicyTypes policyTypes;
+ private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+ private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
@Autowired
- public RefreshConfigTask(ApplicationConfig appConfig) {
+ public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
+ PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) {
this.appConfig = appConfig;
+ this.rics = rics;
+ this.policies = policies;
+ this.services = services;
+ this.policyTypes = policyTypes;
+ this.a1ClientFactory = a1ClientFactory;
}
public void start() {
logger.debug("Starting refreshConfigTask");
stop();
- loadConfigurationFromFile();
refreshTask = createRefreshTask() //
.subscribe(notUsed -> logger.debug("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
- () -> logger.debug("Configuration refresh completed"));
+ () -> logger.error("Configuration refresh terminated"));
}
public void stop() {
}
Flux<ApplicationConfig> createRefreshTask() {
- return getEnvironment(systemEnvironment) //
+ Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
+ .filter(notUsed -> configFileExists()) //
+ .filter(notUsed -> !this.isConsulUsed) //
+ .flatMap(notUsed -> loadConfigurationFromFile()) //
+ .onErrorResume(this::ignoreError) //
+ .doOnNext(json -> logger.debug("loadFromFile")) //
+ .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
+
+ Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
.flatMap(this::createCbsClient) //
.flatMapMany(this::periodicConfigurationUpdates) //
- .map(this::parseRicConfigurationfromConsul) //
- .onErrorResume(this::onErrorResume);
+ .onErrorResume(this::ignoreError) //
+ .doOnNext(json -> logger.debug("loadFromConsul")) //
+ .doOnNext(json -> this.isConsulUsed = true) //
+ .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
+
+ return Flux.merge(loadFromFile, loadFromConsul) //
+ .flatMap(this::parseConfiguration) //
+ .flatMap(this::updateConfig) //
+ .doOnNext(this::handleUpdatedRicConfig) //
+ .flatMap(configUpdate -> Flux.just(this.appConfig)) //
+ .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
final Duration initialDelay = Duration.ZERO;
- final Duration refreshPeriod = Duration.ofMinutes(1);
final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
- return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
}
- private <R> Mono<R> onErrorResume(Throwable throwable) {
+ private <R> Mono<R> ignoreError(Throwable throwable) {
String errMsg = throwable.toString();
- logger.error("Could not refresh application configuration. {}", errMsg);
+ logger.warn("Could not refresh application configuration. {}", errMsg);
return Mono.empty();
}
- private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+ private Mono<ApplicationConfigParser> parseConfiguration(JsonObject jsonObject) {
try {
ApplicationConfigParser parser = new ApplicationConfigParser();
parser.parse(jsonObject);
- this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapPublisherConfig(),
- parser.getDmaapConsumerConfig());
+ return Mono.just(parser);
} catch (ServiceException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser config) {
+ return this.appConfig.setConfiguration(config.getRicConfigs(), config.getDmaapPublisherConfig(),
+ config.getDmaapConsumerConfig());
+ }
+
+ boolean configFileExists() {
+ String filepath = appConfig.getLocalConfigurationFilePath();
+ return (filepath != null && (new File(filepath).exists()));
+ }
+
+ private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+ synchronized (this.rics) {
+ String ricName = updatedInfo.getRicConfig().name();
+ RicConfigUpdate.Type event = updatedInfo.getType();
+ if (event == RicConfigUpdate.Type.ADDED) {
+ addRic(updatedInfo.getRicConfig());
+ } else if (event == RicConfigUpdate.Type.REMOVED) {
+ rics.remove(ricName);
+ this.policies.removePoliciesForRic(ricName);
+ } else if (event == RicConfigUpdate.Type.CHANGED) {
+ Ric ric = this.rics.get(ricName);
+ if (ric == null) {
+ // Should not happend,just for robustness
+ addRic(updatedInfo.getRicConfig());
+ } else {
+ ric.setRicConfig(updatedInfo.getRicConfig());
+ }
+ }
}
- return this.appConfig;
+ }
+
+ private void addRic(RicConfig config) {
+ Ric ric = new Ric(config);
+ this.rics.put(ric);
+ runRicSynchronization(ric);
+ }
+
+ void runRicSynchronization(Ric ric) {
+ RicSynchronizationTask synchronizationTask =
+ new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+ synchronizationTask.run(ric);
}
/**
* Reads the configuration from file.
*/
- void loadConfigurationFromFile() {
+ Flux<JsonObject> loadConfigurationFromFile() {
String filepath = appConfig.getLocalConfigurationFilePath();
- if (filepath == null) {
- logger.debug("No localconfiguration file used");
- return;
- }
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
ApplicationConfigParser appParser = new ApplicationConfigParser();
appParser.parse(rootObject);
- appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
- appParser.getDmaapConsumerConfig());
- logger.info("Local configuration file loaded: {}", filepath);
+ logger.debug("Local configuration file loaded: {}", filepath);
+ return Flux.just(rootObject);
} catch (JsonSyntaxException | ServiceException | IOException e) {
- logger.trace("Local configuration file not loaded: {}", filepath, e);
+ logger.debug("Local configuration file not loaded: {}", filepath, e);
+ return Flux.empty();
}
}
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.tasks;
-
-import org.oransc.policyagent.clients.A1ClientFactory;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
-import org.oransc.policyagent.configuration.RicConfig;
-import org.oransc.policyagent.repository.Policies;
-import org.oransc.policyagent.repository.PolicyTypes;
-import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Rics;
-import org.oransc.policyagent.repository.Services;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.Ordered;
-import org.springframework.core.annotation.Order;
-import org.springframework.stereotype.Service;
-
-/**
- * Loads information about RealTime-RICs at startup.
- */
-@Service("startupService")
-@Order(Ordered.HIGHEST_PRECEDENCE)
-public class StartupService implements ApplicationConfig.Observer {
-
- private static final Logger logger = LoggerFactory.getLogger(StartupService.class);
-
- @Autowired
- ApplicationConfig applicationConfig;
-
- @Autowired
- RefreshConfigTask refreshConfigTask;
-
- @Autowired
- private Rics rics;
-
- @Autowired
- PolicyTypes policyTypes;
-
- @Autowired
- private A1ClientFactory a1ClientFactory;
-
- @Autowired
- private Policies policies;
-
- @Autowired
- private Services services;
-
- // Only for unit testing
- StartupService(ApplicationConfig appConfig, RefreshConfigTask refreshTask, Rics rics, PolicyTypes policyTypes,
- A1ClientFactory a1ClientFactory, Policies policies, Services services) {
- this.applicationConfig = appConfig;
- this.refreshConfigTask = refreshTask;
- this.rics = rics;
- this.policyTypes = policyTypes;
- this.a1ClientFactory = a1ClientFactory;
- this.policies = policies;
- this.services = services;
- }
-
- @Override
- public void onRicConfigUpdate(RicConfig ricConfig, RicConfigUpdate event) {
- synchronized (this.rics) {
- switch (event) {
- case ADDED:
- case CHANGED:
- Ric ric = new Ric(ricConfig);
- rics.put(ric);
- RicSynchronizationTask synchronizationTask = createSynchronizationTask();
- synchronizationTask.run(ric);
- break;
-
- case REMOVED:
- rics.remove(ricConfig.name());
- policies.removePoliciesForRic(ricConfig.name());
- break;
-
- default:
- logger.error("Unhandled ric event: {}", event);
- }
- }
- }
-
- /**
- * Reads the configured Rics and performs the service discovery. The result is put into the repository.
- */
- public void startup() {
- logger.debug("Starting up");
- applicationConfig.addObserver(this);
- refreshConfigTask.start();
- }
-
- RicSynchronizationTask createSynchronizationTask() {
- return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
- }
-}
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.oransc.policyagent.configuration.ApplicationConfig.Observer;
+import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate;
import org.oransc.policyagent.exceptions.ServiceException;
@ExtendWith(MockitoExtension.class)
public class ApplicationConfigTest {
- @Mock
- Observer observerMock1;
-
- @Mock
- Observer observerMock2;
private static final ImmutableRicConfig RIC_CONFIG_1 = ImmutableRicConfig.builder() //
.name("ric1") //
.build();
@Test
- public void addRicShouldNotifyAllObserversOfRicAdded() throws Exception {
+ public void gettingNotAddedRicShouldThrowException() {
ApplicationConfig appConfigUnderTest = new ApplicationConfig();
- appConfigUnderTest.addObserver(observerMock1);
- appConfigUnderTest.addObserver(observerMock2);
-
appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null);
- verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED);
- verify(observerMock2).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED);
+ Exception exception = assertThrows(ServiceException.class, () -> {
+ appConfigUnderTest.getRic("name");
+ });
+
+ assertEquals("Could not find ric: name", exception.getMessage());
+ }
+
+ @Test
+ public void addRicShouldNotifyAllObserversOfRicAdded() throws Exception {
+ ApplicationConfig appConfigUnderTest = new ApplicationConfig();
+ RicConfigUpdate update =
+ appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null).blockFirst();
+ assertEquals(RicConfigUpdate.Type.ADDED, update.getType());
assertTrue(appConfigUnderTest.getRicConfigs().contains(RIC_CONFIG_1), "Ric not added to configuraions.");
assertEquals(RIC_CONFIG_1, appConfigUnderTest.getRic(RIC_CONFIG_1.name()),
public void changedRicShouldNotifyAllObserversOfRicChanged() throws Exception {
ApplicationConfig appConfigUnderTest = new ApplicationConfig();
- appConfigUnderTest.addObserver(observerMock1);
-
appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null);
ImmutableRicConfig changedRicConfig = ImmutableRicConfig.builder() //
.managedElementIds(new Vector<>()) //
.build();
- appConfigUnderTest.setConfiguration(Arrays.asList(changedRicConfig), null, null);
-
- verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.ADDED);
- verify(observerMock1).onRicConfigUpdate(changedRicConfig, ApplicationConfig.RicConfigUpdate.CHANGED);
+ RicConfigUpdate update =
+ appConfigUnderTest.setConfiguration(Arrays.asList(changedRicConfig), null, null).blockFirst();
+ assertEquals(RicConfigUpdate.Type.CHANGED, update.getType());
assertEquals(changedRicConfig, appConfigUnderTest.getRic(RIC_CONFIG_1.name()),
"Changed Ric not retrieved from configurations.");
}
public void removedRicShouldNotifyAllObserversOfRicRemoved() {
ApplicationConfig appConfigUnderTest = new ApplicationConfig();
- appConfigUnderTest.addObserver(observerMock1);
-
ImmutableRicConfig ricConfig2 = ImmutableRicConfig.builder() //
.name("ric2") //
.baseUrl("ric2_url") //
appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1, ricConfig2), null, null);
- appConfigUnderTest.setConfiguration(Arrays.asList(ricConfig2), null, null);
-
- verify(observerMock1).onRicConfigUpdate(RIC_CONFIG_1, ApplicationConfig.RicConfigUpdate.REMOVED);
+ RicConfigUpdate update =
+ appConfigUnderTest.setConfiguration(Arrays.asList(ricConfig2), null, null).blockFirst();
+ assertEquals(RicConfigUpdate.Type.REMOVED, update.getType());
assertEquals(1, appConfigUnderTest.getRicConfigs().size(), "Ric not deleted from configurations.");
}
- @Test
- public void gettingNotAddedRicShouldThrowException() {
- ApplicationConfig appConfigUnderTest = new ApplicationConfig();
-
- appConfigUnderTest.setConfiguration(Arrays.asList(RIC_CONFIG_1), null, null);
-
- Exception exception = assertThrows(ServiceException.class, () -> {
- appConfigUnderTest.getRic("name");
- });
-
- assertEquals("Could not find ric: name", exception.getMessage());
- }
-
}
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Vector;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.configuration.ApplicationConfigParser;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Services;
import org.oransc.policyagent.utils.LoggingUtils;
import reactor.core.publisher.Flux;
.build();
}
+ private RefreshConfigTask createTestObject(boolean configFileExists) {
+ RefreshConfigTask obj = spy(new RefreshConfigTask(appConfig, new Rics(), new Policies(), new Services(),
+ new PolicyTypes(), new A1ClientFactory(appConfig)));
+ doReturn(configFileExists).when(obj).configFileExists();
+ return obj;
+ }
+
@Test
public void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception {
- refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+ refreshTaskUnderTest = this.createTestObject(true);
refreshTaskUnderTest.systemEnvironment = new Properties();
// When
doReturn(getCorrectJson()).when(refreshTaskUnderTest).createInputStream(any());
doReturn("fileName").when(appConfig).getLocalConfigurationFilePath();
- refreshTaskUnderTest.start();
+
+ StepVerifier.create(refreshTaskUnderTest.createRefreshTask()) //
+ .expectSubscription() //
+ .expectNext(this.appConfig) //
+ .expectNext(this.appConfig) //
+ .thenCancel() //
+ .verify();
// Then
verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile();
@Test
public void whenFileExistsButJsonIsIncorrect_thenNoRicsArePutInRepository() throws Exception {
- refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+ refreshTaskUnderTest = this.createTestObject(true);
refreshTaskUnderTest.systemEnvironment = new Properties();
// When
doReturn(getIncorrectJson()).when(refreshTaskUnderTest).createInputStream(any());
doReturn("fileName").when(appConfig).getLocalConfigurationFilePath();
- refreshTaskUnderTest.loadConfigurationFromFile();
+
+ StepVerifier.create(refreshTaskUnderTest.createRefreshTask()) //
+ .expectSubscription() //
+ .expectNoEvent(Duration.ofMillis(100)) //
+ .thenCancel() //
+ .verify();
// Then
verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile();
assertThat(appConfig.getRicConfigs().size()).isEqualTo(0);
}
- @Test
- public void whenPeriodicConfigRefreshNoEnvironmentVariables_thenErrorIsLogged() {
- refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
- refreshTaskUnderTest.systemEnvironment = new Properties();
-
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class);
- Flux<ApplicationConfig> task = refreshTaskUnderTest.createRefreshTask();
-
- StepVerifier.create(task).expectSubscription().verifyComplete();
-
- assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR);
- assertThat(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")).isTrue();
- }
-
@Test
public void whenPeriodicConfigRefreshNoConsul_thenErrorIsLogged() {
- refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+ refreshTaskUnderTest = this.createTestObject(false);
refreshTaskUnderTest.systemEnvironment = new Properties();
EnvProperties props = properties();
doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any());
doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props);
- Flux<JsonObject> err = Flux.error(new IOException());
+ Flux<?> err = Flux.error(new IOException());
doReturn(err).when(cbsClient).updates(any(), any(), any());
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class);
- Flux<ApplicationConfig> task = refreshTaskUnderTest.createRefreshTask();
+ Flux<?> task = refreshTaskUnderTest.createRefreshTask();
StepVerifier //
.create(task) //
.expectSubscription() //
- .verifyComplete();
+ .expectNoEvent(Duration.ofMillis(100)) //
+ .thenCancel() //
+ .verify();
- assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR);
assertThat(
logAppender.list.toString().contains("Could not refresh application configuration. java.io.IOException"))
.isTrue();
@Test
public void whenPeriodicConfigRefreshSuccess_thenNewConfigIsCreated() throws Exception {
- refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
+ refreshTaskUnderTest = this.createTestObject(false);
refreshTaskUnderTest.systemEnvironment = new Properties();
EnvProperties props = properties();
.create(task) //
.expectSubscription() //
.expectNext(appConfig) //
- .verifyComplete();
+ .expectNext(appConfig) //
+ .thenCancel() //
+ .verify();
+ verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any());
assertThat(appConfig.getRicConfigs()).isNotNull();
assertThat(appConfig.getRic(RIC_1_NAME).baseUrl()).isEqualTo(newBaseUrl);
}
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.tasks;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.configuration.ImmutableRicConfig;
-import org.oransc.policyagent.configuration.RicConfig;
-import org.oransc.policyagent.repository.Policies;
-import org.oransc.policyagent.repository.Policy;
-import org.oransc.policyagent.repository.PolicyType;
-import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Rics;
-
-@ExtendWith(MockitoExtension.class)
-public class StartupServiceTest {
- private static final String FIRST_RIC_NAME = "first";
- private static final String SECOND_RIC_NAME = "second";
-
- @Mock
- ApplicationConfig appConfigMock;
- @Mock
- RefreshConfigTask refreshTaskMock;
- @Mock
- RicSynchronizationTask synchronizationTaskMock;
-
- @Test
- public void startup_thenServiceIsAddedAsObeserverAndRefreshIsStarted() {
- StartupService serviceUnderTest =
- new StartupService(appConfigMock, refreshTaskMock, null, null, null, null, null);
-
- serviceUnderTest.startup();
-
- verify(appConfigMock).addObserver(serviceUnderTest);
- verify(refreshTaskMock).start();
- }
-
- @Test
- public void twoNewRicsAddedToConfiguration_thenSynchronizationIsStartedAndTwoRicsAreAddedInRepository() {
-
- Rics rics = new Rics();
- StartupService serviceUnderTest =
- spy(new StartupService(appConfigMock, refreshTaskMock, rics, null, null, null, null));
-
- doReturn(synchronizationTaskMock).when(serviceUnderTest).createSynchronizationTask();
-
- serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME), ApplicationConfig.RicConfigUpdate.ADDED);
- serviceUnderTest.onRicConfigUpdate(getRicConfig(SECOND_RIC_NAME), ApplicationConfig.RicConfigUpdate.ADDED);
-
- Ric firstRic = rics.get(FIRST_RIC_NAME);
- assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
- verify(synchronizationTaskMock, times(1)).run(firstRic);
-
- Ric secondRic = rics.get(SECOND_RIC_NAME);
- assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics");
- verify(synchronizationTaskMock).run(secondRic);
- }
-
- @Test
- public void oneRicIsChanged_thenSynchronizationIsStartedAndRicIsUpdatedInRepository() {
- Rics rics = new Rics();
- Ric originalRic = new Ric(getRicConfig(FIRST_RIC_NAME, "managedElement1"));
- rics.put(originalRic);
-
- StartupService serviceUnderTest =
- spy(new StartupService(appConfigMock, refreshTaskMock, rics, null, null, null, null));
-
- doReturn(synchronizationTaskMock).when(serviceUnderTest).createSynchronizationTask();
-
- String updatedManagedElementName = "managedElement2";
- serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, updatedManagedElementName),
- ApplicationConfig.RicConfigUpdate.CHANGED);
-
- Ric firstRic = rics.get(FIRST_RIC_NAME);
- assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
- assertTrue(firstRic.getManagedElementIds().contains(updatedManagedElementName), "Ric not updated");
- verify(synchronizationTaskMock).run(firstRic);
- }
-
- @Test
- public void oneRicIsRemoved_thenNoSynchronizationIsStartedAndRicAndItsPoliciesAreDeletedFromRepository() {
- Rics rics = new Rics();
- RicConfig ricConfig = getRicConfig(FIRST_RIC_NAME);
- Ric ric = new Ric(ricConfig);
- rics.put(ric);
-
- Policies policies = addPolicyForRic(ric);
-
- StartupService serviceUnderTest =
- new StartupService(appConfigMock, refreshTaskMock, rics, null, null, policies, null);
-
- serviceUnderTest.onRicConfigUpdate(ricConfig, ApplicationConfig.RicConfigUpdate.REMOVED);
-
- assertEquals(0, rics.size(), "Ric not deleted");
- assertEquals(0, policies.size(), "Ric's policies not deleted");
- }
-
- private Policies addPolicyForRic(Ric ric) {
- Policies policies = new Policies();
- Policy policyMock = mock(Policy.class);
- when(policyMock.id()).thenReturn("policyId");
- when(policyMock.ric()).thenReturn(ric);
- PolicyType policyTypeMock = mock(PolicyType.class);
- when(policyTypeMock.name()).thenReturn("typeName");
- when(policyMock.type()).thenReturn(policyTypeMock);
- policies.put(policyMock);
- return policies;
- }
-
- private RicConfig getRicConfig(String name) {
- return getRicConfig(name, null);
- }
-
- private RicConfig getRicConfig(String name, String managedElementName) {
- List<String> managedElements = Collections.emptyList();
- if (managedElementName != null) {
- managedElements = Collections.singletonList(managedElementName);
- }
- ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
- .name(name) //
- .managedElementIds(managedElements) //
- .baseUrl("baseUrl") //
- .build();
- return ricConfig;
- }
-}