When the application config is changed, Ric:s will be added/removed in the repository and
recovered.
Fixed a lot of thread synchronization issues.
Issue-ID: NONRTRIC-84
Change-Id: I42ff66bc59c585d2d9f65e9e00a0ac88998a44bf
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
Properties systemEnvironment;
private Disposable refreshConfigTask = null;
+ private Collection<Observer> observers = new Vector<>();
+
+ private Map<String, RicConfig> ricConfigs = new HashMap<>();
@NotEmpty
private String filepath;
- private Vector<RicConfig> ricConfigs;
-
@Autowired
public ApplicationConfig() {
}
+ protected String getLocalConfigurationFilePath() {
+ return this.filepath;
+ }
+
public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
- public Vector<RicConfig> getRicConfigs() {
- return this.ricConfigs;
+ public synchronized Collection<RicConfig> getRicConfigs() {
+ return this.ricConfigs.values();
}
- public Optional<RicConfig> lookupRicConfigForManagedElement(String managedElementId) {
+ public synchronized Optional<RicConfig> lookupRicConfigForManagedElement(String managedElementId) {
for (RicConfig ricConfig : getRicConfigs()) {
if (ricConfig.managedElementIds().contains(managedElementId)) {
return Optional.of(ricConfig);
public void initialize() {
stop();
- loadConfigurationFromFile(this.filepath);
+ loadConfigurationFromFile();
refreshConfigTask = createRefreshTask() //
.subscribe(notUsed -> logger.info("Refreshed configuration data"),
() -> logger.error("Configuration refresh terminated"));
}
+ public static enum RicConfigUpdate {
+ ADDED, CHANGED, REMOVED
+ }
+
+ public interface Observer {
+ void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
+ }
+
+ public void addObserver(Observer o) {
+ this.observers.add(o);
+ }
+
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
}
return this;
}
- private synchronized void setConfiguration(@NotNull Vector<RicConfig> ricConfigs) {
- this.ricConfigs = ricConfigs;
+ private class Notification {
+ final RicConfig ric;
+ final RicConfigUpdate event;
+
+ Notification(RicConfig ric, RicConfigUpdate event) {
+ this.ric = ric;
+ this.event = event;
+ }
+ }
+
+ private void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
+ Collection<Notification> notifications = new Vector<>();
+ synchronized (this) {
+ Map<String, RicConfig> newRicConfigs = new HashMap<>();
+ for (RicConfig newConfig : ricConfigs) {
+ RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+ if (oldConfig == null) {
+ newRicConfigs.put(newConfig.name(), newConfig);
+ notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
+ this.ricConfigs.remove(newConfig.name());
+ } else if (!newConfig.equals(newConfig)) {
+ notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
+ newRicConfigs.put(newConfig.name(), newConfig);
+ this.ricConfigs.remove(newConfig.name());
+ } else {
+ newRicConfigs.put(oldConfig.name(), oldConfig);
+ }
+ }
+ for (RicConfig deletedConfig : this.ricConfigs.values()) {
+ notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
+ }
+ this.ricConfigs = newRicConfigs;
+ }
+ notifyObservers(notifications);
+ }
+
+ private void notifyObservers(Collection<Notification> notifications) {
+ for (Observer observer : this.observers) {
+ for (Notification notif : notifications) {
+ observer.onRicConfigUpdate(notif.ric, notif.event);
+ }
+ }
}
public void stop() {
/**
* Reads the configuration from file.
*/
- protected void loadConfigurationFromFile(String filepath) {
+ public void loadConfigurationFromFile() {
+ String filepath = getLocalConfigurationFilePath();
+ if (filepath == null) {
+ logger.debug("No localconfiguration file used");
+ return;
+ }
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
}
ApplicationConfigParser appParser = new ApplicationConfigParser();
appParser.parse(rootObject);
- this.ricConfigs = appParser.getRicConfigs();
+ setConfiguration(appParser.getRicConfigs());
logger.info("Local configuration file loaded: {}", filepath);
} catch (JsonSyntaxException | ServiceException | IOException e) {
logger.trace("Local configuration file not loaded: {}", filepath, e);
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.configuration;
-
-import io.swagger.annotations.ApiOperation;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.TaskScheduler;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
-
-@Component
-@EnableScheduling
-class ApplicationConfigLoader {
-
- private static final Logger logger = LoggerFactory.getLogger(ApplicationConfigLoader.class);
- private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
- private static final Duration CONFIGURATION_REFRESH_INTERVAL = Duration.ofSeconds(15);
-
- private final TaskScheduler taskScheduler;
- private final ApplicationConfig configuration;
-
- @Autowired
- public ApplicationConfigLoader(TaskScheduler taskScheduler, ApplicationConfig configuration) {
- this.taskScheduler = taskScheduler;
- this.configuration = configuration;
- }
-
- /**
- * Function which have to stop tasks execution.
- *
- * @return response entity about status of cancellation operation
- */
- @ApiOperation(value = "Get response on stopping task execution")
- public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
- scheduledFutureList.forEach(x -> x.cancel(false));
- scheduledFutureList.clear();
- logger.info("Stopped");
- return Mono.just(new ResponseEntity<>("Service has already been stopped!", HttpStatus.CREATED));
- }
-
- @PostConstruct
- @ApiOperation(value = "Start task if possible")
- public synchronized boolean start() {
- logger.info("Start scheduling Datafile workflow");
- configuration.initialize();
-
- if (scheduledFutureList.isEmpty()) {
- scheduledFutureList
- .add(taskScheduler.scheduleWithFixedDelay(this::refreshConfiguration, CONFIGURATION_REFRESH_INTERVAL));
- return true;
- } else {
- return false;
- }
- }
-
- private void refreshConfiguration() {
- // TBD
-
- }
-
-}
@ApiOperation(value = "Returns policy type schema definitions")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Policy Types found")})
public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
- if (ricName == null) {
- Collection<PolicyType> types = this.policyTypes.getAll();
- return new ResponseEntity<String>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
- } else {
- try {
- Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+ synchronized (this.policyTypes) {
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
return new ResponseEntity<String>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+ return new ResponseEntity<String>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+ }
}
}
}
@ApiOperation(value = "Returns policy types")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Policy Types found")})
public ResponseEntity<String> getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) {
- if (ricName == null) {
- Collection<PolicyType> types = this.policyTypes.getAll();
- return new ResponseEntity<String>(toPolicyTypeIdsJson(types), HttpStatus.OK);
- } else {
- try {
- Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+ synchronized (this.policyTypes) {
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
return new ResponseEntity<String>(toPolicyTypeIdsJson(types), HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+ return new ResponseEntity<String>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+ }
}
}
}
public Mono<ResponseEntity<Void>> deletePolicy( //
@RequestParam(name = "instance", required = true) String id) {
Policy policy = policies.get(id);
- if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) {
+ if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) {
return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
.doOnEach(notUsed -> policies.removeId(id)) //
.flatMap(notUsed -> {
Ric ric = rics.get(ricName);
PolicyType type = policyTypes.get(typeName);
- if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) {
+ if (ric != null && type != null && ric.state().equals(Ric.RicState.IDLE)) {
Policy policy = ImmutablePolicy.builder() //
.id(instanceId) //
.json(jsonBody) //
@RequestParam(name = "ric", required = false) String ric, //
@RequestParam(name = "service", required = false) String service) //
{
- Collection<Policy> result = null;
+ synchronized (policies) {
+ Collection<Policy> result = null;
- if (type != null) {
- result = policies.getForType(type);
- result = filter(result, null, ric, service);
- } else if (service != null) {
- result = policies.getForService(service);
- result = filter(result, type, ric, null);
- } else if (ric != null) {
- result = policies.getForRic(ric);
- result = filter(result, type, null, service);
- } else {
- result = policies.getAll();
- }
+ if (type != null) {
+ result = policies.getForType(type);
+ result = filter(result, null, ric, service);
+ } else if (service != null) {
+ result = policies.getForService(service);
+ result = filter(result, type, ric, null);
+ } else if (ric != null) {
+ result = policies.getForRic(ric);
+ result = filter(result, type, null, service);
+ } else {
+ result = policies.getAll();
+ }
- return new ResponseEntity<String>(policiesToJson(result), HttpStatus.OK);
+ return new ResponseEntity<String>(policiesToJson(result), HttpStatus.OK);
+ }
}
private boolean include(String filter, String value) {
})
public ResponseEntity<String> getRics(
@RequestParam(name = "policyType", required = false) String supportingPolicyType) {
+
Vector<RicInfo> result = new Vector<>();
- for (Ric ric : rics.getRics()) {
- if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
- result.add(ImmutableRicInfo.builder() //
- .name(ric.name()) //
- .managedElementIds(ric.getManagedElementIds()) //
- .policyTypes(ric.getSupportedPolicyTypeNames()) //
- .build());
+ synchronized (rics) {
+ for (Ric ric : rics.getRics()) {
+ if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
+ result.add(ImmutableRicInfo.builder() //
+ .name(ric.name()) //
+ .managedElementIds(ric.getManagedElementIds()) //
+ .policyTypes(ric.getSupportedPolicyTypeNames()) //
+ .build());
+ }
}
}
@GetMapping("/services")
public ResponseEntity<?> getServices() {
- Collection<Service> allServices = this.services.getAll();
- Collection<ServiceStatus> result = new Vector<>(allServices.size());
- for (Service s : allServices) {
- result.add(toServiceStatus(s));
+ Collection<ServiceStatus> result = new Vector<>();
+ synchronized (this.services) {
+ for (Service s : this.services.getAll()) {
+ result.add(toServiceStatus(s));
+ }
}
return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
}
package org.oransc.policyagent.repository;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
if (map == null) {
return new Vector<Policy>();
}
- return map.values();
+ return Collections.unmodifiableCollection(map.values());
}
public synchronized boolean containsPolicy(String id) {
}
public synchronized Collection<Policy> getAll() {
- return policiesId.values();
+ return Collections.unmodifiableCollection(policiesId.values());
}
public synchronized Collection<Policy> getForService(String service) {
package org.oransc.policyagent.repository;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
types.put(type.name(), type);
}
- public boolean contains(String policyType) {
+ public synchronized boolean contains(String policyType) {
return types.containsKey(policyType);
}
public synchronized Collection<PolicyType> getAll() {
- return types.values();
+ return Collections.unmodifiableCollection(types.values());
}
- public int size() {
+ public synchronized int size() {
return types.size();
}
- public void clear() {
+ public synchronized void clear() {
this.types.clear();
}
}
*/
public class Ric {
private final RicConfig ricConfig;
- private RicState state = RicState.NOT_INITIATED;
+ private RicState state = RicState.UNDEFINED;
private Map<String, PolicyType> supportedPolicyTypes = new HashMap<>();
/**
*/
public static enum RicState {
/**
- * The Ric has not been initiated yet.
+ * The agent view of the agent may be inconsistent
*/
- NOT_INITIATED,
+ UNDEFINED,
/**
- * The Ric is working fine.
+ * The normal state. Policies can be configured.
*/
- ACTIVE,
+ IDLE,
/**
- * The Ric cannot be contacted.
+ * The Ric states are recovered
*/
- NOT_REACHABLE, RECOVERING
+ RECOVERING
}
}
package org.oransc.policyagent.repository;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
rics.put(ric.name(), ric);
}
- public Collection<Ric> getRics() {
+ public synchronized Iterable<Ric> getRics() {
return rics.values();
}
- public Ric getRic(String name) throws ServiceException {
+ public synchronized Ric getRic(String name) throws ServiceException {
Ric ric = rics.get(name);
if (ric == null) {
throw new ServiceException("Could not find ric: " + name);
return ric;
}
- public Ric get(String name) {
+ public synchronized Ric get(String name) {
return rics.get(name);
}
- public int size() {
+ public synchronized void remove(String name) {
+ rics.remove(name);
+ }
+
+ public synchronized int size() {
return rics.size();
}
- public void clear() {
+ public synchronized void clear() {
this.rics.clear();
}
}
package org.oransc.policyagent.repository;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public synchronized void put(Service service) {
logger.debug("Put service: " + service.getName());
- // TODO a threading problem is that this may happend at the same time as someone is iterating (getAll())
- // This is a generic problem
services.put(service.getName(), service);
}
- // TODO the returned value should be unmodifiable if possible
- public synchronized Collection<Service> getAll() {
+ public synchronized Iterable<Service> getAll() {
return services.values();
}
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
import org.slf4j.Logger;
public void checkAllRics() {
logger.debug("Checking Rics starting");
createTask().subscribe(this::onRicChecked, this::onError, this::onComplete);
-
}
private Flux<Ric> createTask() {
- return Flux.fromIterable(rics.getRics()) //
- .flatMap(ric -> checkInstances(ric)) //
- .flatMap(ric -> checkTypes(ric));
+ synchronized (this.rics) {
+ return Flux.fromIterable(rics.getRics()) //
+ .flatMap(ric -> checkRicState(ric)) //
+ .flatMap(ric -> checkRicPolicies(ric)) //
+ .flatMap(ric -> checkRicPolicyTypes(ric));
+ }
}
- private Mono<Ric> checkInstances(Ric ric) {
+ private Mono<Ric> checkRicState(Ric ric) {
+ if (ric.state() == RicState.UNDEFINED) {
+ return startRecovery(ric);
+ } else if (ric.state() == RicState.RECOVERING) {
+ return Mono.empty();
+ } else {
+ return Mono.just(ric);
+ }
+ }
+ private Mono<Ric> checkRicPolicies(Ric ric) {
return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
.onErrorResume(t -> Mono.empty()) //
.flatMap(ricP -> validateInstances(ricP, ric));
}
private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
- if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
- return startRecovery(ric);
+ synchronized (this.policies) {
+ if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
+ return startRecovery(ric);
+ }
}
for (String policyId : ricPolicies) {
if (!policies.containsPolicy(policyId)) {
return Mono.just(ric);
}
- private Mono<Ric> checkTypes(Ric ric) {
+ private Mono<Ric> checkRicPolicyTypes(Ric ric) {
return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
.onErrorResume(t -> {
return Mono.empty();
import org.oransc.policyagent.repository.PolicyType;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Service;
import org.oransc.policyagent.repository.Services;
import org.slf4j.Logger;
this.services = services;
}
- public void run(Collection<Ric> rics) {
- for (Ric ric : rics) {
- run(ric);
+ public void run(Rics rics) {
+ synchronized (rics) {
+ for (Ric ric : rics.getRics()) {
+ run(ric);
+ }
}
}
private void onComplete(Ric ric) {
logger.debug("Recovery completed for:" + ric.name());
- ric.setState(Ric.RicState.ACTIVE);
+ ric.setState(Ric.RicState.IDLE);
notifyAllServices("Recovery completed for:" + ric.name());
}
private void notifyAllServices(String body) {
- for (Service service : services.getAll()) {
- String url = service.getCallbackUrl();
- if (service.getCallbackUrl().length() > 0) {
- createClient(url) //
- .put("", body) //
- .subscribe(rsp -> logger.debug("Service called"),
- throwable -> logger.warn("Service called failed", throwable),
- () -> logger.debug("Service called complete"));
+ synchronized (services) {
+ for (Service service : services.getAll()) {
+ String url = service.getCallbackUrl();
+ if (service.getCallbackUrl().length() > 0) {
+ createClient(url) //
+ .put("", body) //
+ .subscribe(rsp -> logger.debug("Service called"),
+ throwable -> logger.warn("Service called failed", throwable),
+ () -> logger.debug("Service called complete"));
+ }
}
}
}
private void onError(Ric ric, Throwable t) {
logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
- ric.setState(Ric.RicState.NOT_REACHABLE);
+ ric.setState(Ric.RicState.UNDEFINED);
}
private AsyncRestClient createClient(final String url) {
ric.clearSupportedPolicyTypes();
return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
.flatMapMany(types -> Flux.fromIterable(types)) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
.flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
.doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
}
}
private Flux<String> deletePolicies(Ric ric) {
- Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
- for (Policy policy : ricPolicies) {
- this.policies.remove(policy);
+ synchronized (policies) {
+ Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+ for (Policy policy : ricPolicies) {
+ this.policies.remove(policy);
+ }
}
return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
}
private Flux<Policy> createTask() {
- return Flux.fromIterable(services.getAll()) //
- .filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
- .flatMap(service -> getAllPolicies(service)) //
- .doOnNext(policy -> this.policies.remove(policy)) //
- .flatMap(policy -> deletePolicyInRic(policy));
+ synchronized (services) {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(service -> service.isExpired()) //
+ .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
+ .flatMap(service -> getAllPolicies(service)) //
+ .doOnNext(policy -> this.policies.remove(policy)) //
+ .flatMap(policy -> deletePolicyInRic(policy));
+ }
}
private Flux<Policy> getAllPolicies(Service service) {
- return Flux.fromIterable(policies.getForService(service.getName()));
+ synchronized (policies) {
+ return Flux.fromIterable(policies.getForService(service.getName()));
+ }
}
private Mono<Policy> deletePolicyInRic(Policy policy) {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
/**
* Loads information about RealTime-RICs at startup.
*/
@Service("startupService")
-public class StartupService {
+@Order(Ordered.HIGHEST_PRECEDENCE)
+public class StartupService implements ApplicationConfig.Observer {
private static final Logger logger = LoggerFactory.getLogger(StartupService.class);
this.services = services;
}
+ @Override
+ public void onRicConfigUpdate(RicConfig ricConfig, ApplicationConfig.RicConfigUpdate event) {
+ synchronized (this.rics) {
+ if (event.equals(ApplicationConfig.RicConfigUpdate.ADDED)
+ || event.equals(ApplicationConfig.RicConfigUpdate.CHANGED)) {
+ Ric ric = new Ric(ricConfig);
+ rics.put(ric);
+ RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies, services);
+ recoveryTask.run(ric);
+ } else if (event.equals(ApplicationConfig.RicConfigUpdate.REMOVED)) {
+ rics.remove(ricConfig.name());
+ } else {
+ logger.debug("Unhandled event :" + event);
+ }
+ }
+ }
+
/**
* Reads the configured Rics and performs the service discovery. The result is put into the repository.
*/
public void startup() {
logger.debug("Starting up");
+ applicationConfig.addObserver(this);
applicationConfig.initialize();
- for (RicConfig ricConfig : applicationConfig.getRicConfigs()) {
- rics.put(new Ric(ricConfig));
- }
- RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies, services);
- recoveryTask.run(rics.getRics()); // recover all Rics
}
}
public static class MockApplicationConfig extends ApplicationConfig {
@Override
- public void initialize() {
+ protected String getLocalConfigurationFilePath() {
URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json");
- loadConfigurationFromFile(url.getFile());
+ return url.getFile();
}
}
String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
String json = "{}";
addPolicyType("type1", "ric1");
- this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE);
+ this.rics.getRic("ric1").setState(Ric.RicState.IDLE);
this.restTemplate.put(url, json);
reset();
String url = baseUrl() + "/policy?instance=id";
Policy policy = addPolicy("id", "typeName", "service1", "ric1");
- policy.ric().setState(Ric.RicState.ACTIVE);
+ policy.ric().setState(Ric.RicState.IDLE);
assertThat(policies.size()).isEqualTo(1);
this.restTemplate.delete(url);
static class MockApplicationConfig extends ApplicationConfig {
@Override
- public void initialize() {
+ protected String getLocalConfigurationFilePath() {
URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json");
- loadConfigurationFromFile(url.getFile());
+ return url.getFile();
}
+
}
/**
appConfigUnderTest.systemEnvironment = new Properties();
// When
doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
+ doReturn("fileName").when(appConfigUnderTest).getLocalConfigurationFilePath();
appConfigUnderTest.initialize();
// Then
- verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(any());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
- Vector<RicConfig> ricConfigs = appConfigUnderTest.getRicConfigs();
- RicConfig ricConfig = ricConfigs.firstElement();
+ Iterable<RicConfig> ricConfigs = appConfigUnderTest.getRicConfigs();
+ RicConfig ricConfig = ricConfigs.iterator().next();
assertThat(ricConfigs).isNotNull();
assertThat(ricConfig).isEqualTo(CORRECT_RIC_CONIFG);
}
// When
doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
- appConfigUnderTest.loadConfigurationFromFile(any());
+ doReturn("fileName").when(appConfigUnderTest).getLocalConfigurationFilePath();
+ appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(any());
- Assertions.assertNull(appConfigUnderTest.getRicConfigs());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+ Assertions.assertEquals(0, appConfigUnderTest.getRicConfigs().size());
}
@Test
.baseUrl("baseUrl1") //
.managedElementIds(new Vector<String>(Arrays.asList("kista_1", "kista_2"))) //
.build());
- ric1.setState(Ric.RicState.ACTIVE);
+ ric1.setState(Ric.RicState.IDLE);
Ric ric2 = new Ric(ImmutableRicConfig.builder() //
.name("ric2") //
.baseUrl("baseUrl2") //
.managedElementIds(new Vector<String>(Arrays.asList("kista_3", "kista_4"))) //
.build());
- ric2.setState(Ric.RicState.NOT_REACHABLE);
+ ric2.setState(Ric.RicState.UNDEFINED);
Ric ric3 = new Ric(ImmutableRicConfig.builder() //
.name("ric3") //
.baseUrl("baseUrl3") //
supervisorUnderTest.checkAllRics();
- await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state()));
- await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state()));
- await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state()));
+ await().untilAsserted(() -> RicState.IDLE.equals(ric1.state()));
+ await().untilAsserted(() -> RicState.IDLE.equals(ric2.state()));
+ await().untilAsserted(() -> RicState.IDLE.equals(ric3.state()));
verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2");
verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2");
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE;
-import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE;
+import static org.oransc.policyagent.repository.Ric.RicState.IDLE;
import java.util.Arrays;
import java.util.Collection;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
import reactor.core.publisher.Mono;
@Test
public void startup_allOk() {
- Vector<RicConfig> ricConfigs = new Vector<>(2);
- ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A));
- ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
- when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
-
Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
serviceUnderTest.startup();
+ serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
+ ApplicationConfig.RicConfigUpdate.ADDED);
+ serviceUnderTest.onRicConfigUpdate(
+ getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
+ ApplicationConfig.RicConfigUpdate.ADDED);
+
await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2));
verify(a1ClientMock).getPolicyTypeIdentities(FIRST_RIC_URL);
Ric firstRic = rics.get(FIRST_RIC_NAME);
assertNotNull(firstRic, "Ric " + FIRST_RIC_NAME + " not added to repository");
assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
- assertEquals(ACTIVE, firstRic.state(), "Not correct state for ric " + FIRST_RIC_NAME);
+ assertEquals(IDLE, firstRic.state(), "Not correct state for ric " + FIRST_RIC_NAME);
assertEquals(1, firstRic.getSupportedPolicyTypes().size(),
"Not correct no of types supported for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
Ric secondRic = rics.get(SECOND_RIC_NAME);
assertNotNull(secondRic, "Ric " + SECOND_RIC_NAME + " not added to repository");
assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics");
- assertEquals(ACTIVE, secondRic.state(), "Not correct state for " + SECOND_RIC_NAME);
+ assertEquals(IDLE, secondRic.state(), "Not correct state for " + SECOND_RIC_NAME);
assertEquals(2, secondRic.getSupportedPolicyTypes().size(),
"Not correct no of types supported for ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isSupportingType(POLICY_TYPE_1_NAME),
@Test
public void startup_unableToConnectToGetTypes() {
- Vector<RicConfig> ricConfigs = new Vector<>(2);
- ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A));
- ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
- when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
-
Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services());
serviceUnderTest.startup();
+ serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
+ ApplicationConfig.RicConfigUpdate.ADDED);
+ serviceUnderTest.onRicConfigUpdate(
+ getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
+ ApplicationConfig.RicConfigUpdate.ADDED);
verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
- assertEquals(NOT_REACHABLE, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
+ assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
- assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
+ assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
}
@Test
public void startup_unableToConnectToGetPolicies() {
- Vector<RicConfig> ricConfigs = new Vector<>(2);
- ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A));
- ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
- when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services());
serviceUnderTest.startup();
+ serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
+ ApplicationConfig.RicConfigUpdate.ADDED);
+ serviceUnderTest.onRicConfigUpdate(
+ getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
+ ApplicationConfig.RicConfigUpdate.ADDED);
verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
- assertEquals(NOT_REACHABLE, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
+ assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
- assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
+ assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
}
@SafeVarargs
@Override
public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
- Vector<String> result = new Vector<>();
- for (PolicyType p : this.policyTypes.getAll()) {
- result.add(p.name());
+ synchronized (this.policyTypes) {
+ Vector<String> result = new Vector<>();
+ for (PolicyType p : this.policyTypes.getAll()) {
+ result.add(p.name());
+ }
+ return Mono.just(result);
}
- return Mono.just(result);
}
@Override
public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
- Vector<String> result = new Vector<>();
- for (Policy policy : getPolicies(nearRtRicUrl).getAll()) {
- if (policy.ric().getConfig().baseUrl().equals(nearRtRicUrl)) {
- result.add(policy.id());
+ synchronized (this.policies) {
+ Vector<String> result = new Vector<>();
+ for (Policy policy : getPolicies(nearRtRicUrl).getAll()) {
+ if (policy.ric().getConfig().baseUrl().equals(nearRtRicUrl)) {
+ result.add(policy.id());
+ }
}
- }
- return Mono.just(result);
+ return Mono.just(result);
+ }
}
@Override