--- /dev/null
+# O-RAN-SC NonRT RIC Dashboard Web Application
+
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of
+policices. It provides support for
+-Policy configuration. This includes
+ -One REST API towards all RICs in the network
+ -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
+ -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
+-Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+-Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+-Consistency monitoring of RIC capabilities (policy types)
+
+The agent can be run stand alone in a simulated test mode. Then it
+simulates RICs.
+The REST API is published on port 8081 and it is started by command:
+mvn -Dtest=MockPolicyAgent test
+
+The backend server publishes live API documentation at the
+URL `http://your-host-name-here:8080/swagger-ui.html`
+
+## License
+
+Copyright (C) 2019 Nordix Foundation. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
<artifactId>swagger-jaxrs2-servlet-initializer</artifactId>
<version>${swagger.version}</version>
</dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>2.9.2</version>
- </dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
package org.oransc.policyagent.clients;
-import reactor.core.publisher.Flux;
+import java.util.Collection;
+
import reactor.core.publisher.Mono;
public interface A1Client {
- public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl);
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl);
- public Flux<String> getPolicyIdentities(String nearRtRicUrl);
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl);
public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId);
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class A1ClientImpl implements A1Client {
}
@Override
- public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", nearRtRicUrl);
AsyncRestClient client = createClient(nearRtRicUrl);
- Mono<String> response = client.get("/policytypes/identities");
- return response.flatMapMany(this::createFlux);
+ return client.get("/policytypes/identities") //
+ .flatMap(this::parseJsonArrayOfString);
}
@Override
- public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
logger.debug("getPolicyIdentities nearRtRicUrl = {}", nearRtRicUrl);
AsyncRestClient client = createClient(nearRtRicUrl);
- Mono<String> response = client.get("/policies/identities");
- return response.flatMapMany(this::createFlux);
+ return client.get("/policies/identities") //
+ .flatMap(this::parseJsonArrayOfString);
}
@Override
return client.delete("/policies/" + policyId);
}
- private Flux<String> createFlux(String inputString) {
+ private Mono<Collection<String>> parseJsonArrayOfString(String inputString) {
try {
List<String> arrayList = new ArrayList<>();
JSONArray jsonArray = new JSONArray(inputString);
arrayList.add(jsonArray.getString(i));
}
logger.debug("A1 client: received list = {}", arrayList);
- return Flux.fromIterable(arrayList);
+ return Mono.just(arrayList);
} catch (JSONException ex) { // invalid json
- return Flux.error(ex);
+ return Mono.error(ex);
}
}
loadConfigurationFromFile(this.filepath);
refreshConfigTask = createRefreshTask() //
- .subscribe(e -> logger.info("Refreshed configuration data"),
+ .subscribe(notUsed -> logger.info("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
() -> logger.error("Configuration refresh terminated"));
}
import java.util.Collection;
import java.util.Vector;
+import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.ImmutablePolicy;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@Api(value = "Policy Management API")
private final Rics rics;
private final PolicyTypes policyTypes;
private final Policies policies;
+ private final A1Client a1Client;
+
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
@Autowired
- PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics) {
+ PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, A1Client a1Client) {
this.policyTypes = types;
this.policies = policies;
this.rics = rics;
+ this.a1Client = a1Client;
}
@GetMapping("/policy_schemas")
@DeleteMapping("/policy")
@ApiOperation(value = "Deletes the policy")
@ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted")})
- public ResponseEntity<Void> deletePolicy( //
- @RequestParam(name = "instance", required = true) String instance) {
+ public Mono<ResponseEntity<Void>> deletePolicy( //
+ @RequestParam(name = "instance", required = true) String id) {
+ Policy policy = policies.get(id);
+ if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) {
+ return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
+ .doOnEach(notUsed -> policies.removeId(id)) //
+ .flatMap(notUsed -> {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
+ });
+ } else {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+ }
+ }
- policies.removeId(instance);
- return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+ @PutMapping(path = "/policy")
+ @ApiOperation(value = "Create the policy")
+ @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
+ public Mono<ResponseEntity<String>> putPolicy( //
+ @RequestParam(name = "type", required = true) String typeName, //
+ @RequestParam(name = "instance", required = true) String instanceId, //
+ @RequestParam(name = "ric", required = true) String ricName, //
+ @RequestParam(name = "service", required = true) String service, //
+ @RequestBody String jsonBody) {
+
+ Ric ric = rics.get(ricName);
+ PolicyType type = policyTypes.get(typeName);
+ if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) {
+ Policy policy = ImmutablePolicy.builder() //
+ .id(instanceId) //
+ .json(jsonBody) //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName(service) //
+ .lastModified(getTimeStampUTC()) //
+ .build();
+ return a1Client.putPolicy(policy.ric().getConfig().baseUrl(), policy.id(), policy.json()) //
+ .doOnNext(notUsed -> policies.put(policy)) //
+ .flatMap(notUsed -> {
+ return Mono.just(new ResponseEntity<>(HttpStatus.CREATED));
+ });
+ }
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@GetMapping("/policies")
return java.time.Instant.now().toString();
}
- @PutMapping(path = "/policy")
- @ApiOperation(value = "Create the policy")
- @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
- public ResponseEntity<String> putPolicy( //
- @RequestParam(name = "type", required = true) String type, //
- @RequestParam(name = "instance", required = true) String instanceId, //
- @RequestParam(name = "ric", required = true) String ric, //
- @RequestParam(name = "service", required = true) String service, //
- @RequestBody String jsonBody) {
-
- try {
- // services.getService(service).ping();
- Ric ricObj = rics.getRic(ric);
- Policy policy = ImmutablePolicy.builder() //
- .id(instanceId) //
- .json(jsonBody) //
- .type(policyTypes.getType(type)) //
- .ric(ricObj) //
- .ownerServiceName(service) //
- .lastModified(getTimeStampUTC()) //
- .build();
- policies.put(policy);
- return new ResponseEntity<String>(HttpStatus.CREATED);
- } catch (ServiceException e) {
- return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
- }
- }
-
}
public String name();
- public Collection<String> nodeNames();
+ public Collection<String> managedElementIds();
public Collection<String> policyTypes();
}
if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
result.add(ImmutableRicInfo.builder() //
.name(ric.name()) //
- .nodeNames(ric.getManagedNodes()) //
+ .managedElementIds(ric.getManagedElementIds()) //
.policyTypes(ric.getSupportedPolicyTypeNames()) //
.build());
}
return policiesId.containsKey(id);
}
+ public synchronized Policy get(String id) {
+ return policiesId.get(id);
+ }
+
public synchronized Policy getPolicy(String id) throws ServiceException {
Policy p = policiesId.get(id);
if (p == null) {
return t;
}
+ public synchronized PolicyType get(String name) {
+ return types.get(name);
+ }
+
public synchronized void put(PolicyType type) {
types.put(type.name(), type);
}
*
* @return a vector containing the nodes managed by this Ric.
*/
- public Vector<String> getManagedNodes() {
+ public Vector<String> getManagedElementIds() {
return ricConfig.managedElementIds();
}
/**
* Determines if the given node is managed by this Ric.
*
- * @param nodeName the node name to check.
+ * @param managedElementId the node name to check.
* @return true if the given node is managed by this Ric.
*/
- public boolean isManaging(String nodeName) {
- return ricConfig.managedElementIds().contains(nodeName);
+ public boolean isManaging(String managedElementId) {
+ return ricConfig.managedElementIds().contains(managedElementId);
}
/**
* Adds the given node as managed by this Ric.
*
- * @param nodeName the node to add.
+ * @param managedElementId the node to add.
*/
- public void addManagedNode(String nodeName) {
- if (!ricConfig.managedElementIds().contains(nodeName)) {
- ricConfig.managedElementIds().add(nodeName);
+ public void addManagedElement(String managedElementId) {
+ if (!ricConfig.managedElementIds().contains(managedElementId)) {
+ ricConfig.managedElementIds().add(managedElementId);
}
}
/**
* Removes the given node as managed by this Ric.
*
- * @param nodeName the node to remove.
+ * @param managedElementId the node to remove.
*/
- public void removeManagedNode(String nodeName) {
- ricConfig.managedElementIds().remove(nodeName);
+ public void removeManagedElement(String managedElementId) {
+ ricConfig.managedElementIds().remove(managedElementId);
}
/**
}
/**
- * Adds policy types as supported by this Ric.
- *
- * @param types the policy types to support.
- */
- public void addSupportedPolicyTypes(Collection<PolicyType> types) {
- for (PolicyType type : types) {
- addSupportedPolicyType(type);
- }
- }
-
- /**
- * Removes a policy type as supported by this Ric.
- *
- * @param type the policy type to remove as supported by this Ric.
+ * Removes all policy type as supported by this Ric.
*/
- public void removeSupportedPolicyType(PolicyType type) {
- supportedPolicyTypes.remove(type.name());
+ public void clearSupportedPolicyTypes() {
+ supportedPolicyTypes.clear();
}
/**
/**
* The Ric cannot be contacted.
*/
- NOT_REACHABLE
+ NOT_REACHABLE, RECOVERING
}
}
public class Rics {
Map<String, Ric> rics = new HashMap<>();
- public void put(Ric ric) {
+ public synchronized void put(Ric ric) {
rics.put(ric.name(), ric);
}
ping();
}
- public String getName() {
+ public synchronized String getName() {
return this.name;
}
- public Duration getKeepAliveInterval() {
+ public synchronized Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}
package org.oransc.policyagent.tasks;
+import java.util.Collection;
+
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Rics rics;
private final Policies policies;
+ private final PolicyTypes policyTypes;
private final A1Client a1Client;
@Autowired
- public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) {
+ public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) {
this.rics = rics;
this.policies = policies;
this.a1Client = a1Client;
+ this.policyTypes = policyTypes;
}
/**
private Flux<Ric> createTask() {
return Flux.fromIterable(rics.getRics()) //
- .groupBy(ric -> ric.state()) //
- .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup));
+ .flatMap(ric -> checkInstances(ric)) //
+ .flatMap(ric -> checkTypes(ric));
}
- private Flux<Ric> handleGroup(Ric.RicState key, Flux<Ric> fluxGroup) {
- logger.debug("Handling group {}", key);
- switch (key) {
- case ACTIVE:
- return fluxGroup.flatMap(this::checkActive);
+ private Mono<Ric> checkInstances(Ric ric) {
- case NOT_REACHABLE:
- return fluxGroup.flatMap(this::checkNotReachable);
+ return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+ .onErrorResume(t -> Mono.empty()) //
+ .flatMap(ricP -> validateInstances(ricP, ric));
+ }
- default:
- // If not initiated, leave it to the StartupService.
- return Flux.empty();
- }
+ private Flux<Ric> junk() {
+ return Flux.empty();
}
- private Mono<Ric> checkActive(Ric ric) {
- logger.debug("Handling active ric {}", ric);
- a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
- .filter(policyId -> !policies.containsPolicy(policyId)) //
- .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
- .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
- .subscribe();
+ private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
+ if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
+ return startRecovery(ric);
+ }
+ for (String policyId : ricPolicies) {
+ if (!policies.containsPolicy(policyId)) {
+ return startRecovery(ric);
+ }
+ }
return Mono.just(ric);
}
- private Mono<Ric> checkNotReachable(Ric ric) {
- logger.debug("Handling not reachable ric {}", ric);
- a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
- .filter(policyId -> !policies.containsPolicy(policyId)) //
- .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
- .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
- .subscribe(null, null, () -> ric.setState(RicState.ACTIVE));
+ private Mono<Ric> checkTypes(Ric ric) {
+ return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+ .onErrorResume(t -> {
+ return Mono.empty();
+ }) //
+ .flatMap(ricTypes -> validateTypes(ricTypes, ric));
+ }
+
+ private Mono<Ric> validateTypes(Collection<String> ricTypes, Ric ric) {
+ if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) {
+ return startRecovery(ric);
+ }
+ for (String typeName : ricTypes) {
+ if (!ric.isSupportingType(typeName)) {
+ return startRecovery(ric);
+ }
+ }
return Mono.just(ric);
}
+ private Mono<Ric> startRecovery(Ric ric) {
+ RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies);
+ recovery.run(ric);
+ return Mono.empty();
+ }
+
private void onRicChecked(Ric ric) {
logger.info("Ric: " + ric.name() + " checked");
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import java.util.Collection;
+import java.util.Vector;
+
+import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Loads information about RealTime-RICs at startup.
+ */
+public class RicRecoveryTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
+
+ private final A1Client a1Client;
+ private final PolicyTypes policyTypes;
+ private final Policies policies;
+
+ public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) {
+ this.a1Client = a1Client;
+ this.policyTypes = policyTypes;
+ this.policies = policies;
+ }
+
+ public void run(Collection<Ric> rics) {
+ for (Ric ric : rics) {
+ run(ric);
+ }
+ }
+
+ public void run(Ric ric) {
+ logger.debug("Handling ric: {}", ric.getConfig().name());
+
+ synchronized (ric) {
+ if (ric.state().equals(Ric.RicState.RECOVERING)) {
+ return; // Already running
+ }
+ ric.setState(Ric.RicState.RECOVERING);
+ }
+ Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
+ Flux<?> deletedPolicies = deletePolicies(ric);
+
+ Flux.merge(recoveredTypes, deletedPolicies) //
+ .subscribe(x -> logger.debug("Recover: " + x), //
+ throwable -> onError(ric, throwable), //
+ () -> onComplete(ric));
+ }
+
+ private void onComplete(Ric ric) {
+ logger.debug("Recovery completed for:" + ric.name());
+ ric.setState(Ric.RicState.ACTIVE);
+
+ }
+
+ private void onError(Ric ric, Throwable t) {
+ logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+ ric.setState(Ric.RicState.NOT_REACHABLE);
+ }
+
+ private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
+ ric.clearSupportedPolicyTypes();
+ return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+ .flatMapMany(types -> Flux.fromIterable(types)) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+ .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
+ .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
+ }
+
+ private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
+ if (policyTypes.contains(policyTypeId)) {
+ try {
+ return Mono.just(policyTypes.getType(policyTypeId));
+ } catch (ServiceException e) {
+ return Mono.error(e);
+ }
+ }
+ return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
+ .flatMap(schema -> createPolicyType(policyTypeId, schema));
+ }
+
+ private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+ PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
+ policyTypes.put(pt);
+ return Mono.just(pt);
+ }
+
+ private Flux<String> deletePolicies(Ric ric) {
+ Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+ for (Policy policy : ricPolicies) {
+ this.policies.remove(policy);
+ }
+
+ return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+ .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
+ .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
+ .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
+ }
+}
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
/**
* Loads information about RealTime-RICs at startup.
*/
@Autowired
private A1Client a1Client;
- StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) {
+ @Autowired
+ private Policies policies;
+
+ StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client,
+ Policies policies) {
this.applicationConfig = appConfig;
this.rics = rics;
this.policyTypes = policyTypes;
this.a1Client = a1Client;
+ this.policies = policies;
}
/**
* Reads the configured Rics and performs the service discovery. The result is put into the repository.
*/
public void startup() {
+ logger.debug("Starting up");
applicationConfig.initialize();
- Flux.fromIterable(applicationConfig.getRicConfigs()) //
- .map(ricConfig -> new Ric(ricConfig)) //
- .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) //
- .flatMap(this::addPolicyTypesForRic) //
- .flatMap(this::deletePoliciesForRic) //
- .doOnNext(rics::put) //
- .subscribe();
- }
-
- private Mono<Ric> addPolicyTypesForRic(Ric ric) {
- a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
- .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) //
- .flatMap(type -> addTypeToRic(ric, type)) //
- .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric));
- return Mono.just(ric);
- }
-
- private Mono<PolicyType> addTypeToRepo(Ric ric, String policyTypeId) {
- if (policyTypes.contains(policyTypeId)) {
- try {
- return Mono.just(policyTypes.getType(policyTypeId));
- } catch (ServiceException e) {
- return Mono.error(e);
- }
+ for (RicConfig ricConfig : applicationConfig.getRicConfigs()) {
+ rics.put(new Ric(ricConfig));
}
- return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
- .flatMap(schema -> createPolicyType(policyTypeId, schema));
- }
-
- private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
- PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
- policyTypes.put(pt);
- return Mono.just(pt);
- }
-
- private Mono<PolicyType> addTypeToRic(Ric ric, PolicyType policyType) {
- ric.addSupportedPolicyType(policyType);
- return Mono.just(policyType);
- }
-
- private Mono<Ric> deletePoliciesForRic(Ric ric) {
- if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) {
- a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
- .doOnNext(
- policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
- .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
- .subscribe(null, cause -> setRicToNotReachable(ric, cause), null);
- }
-
- return Mono.just(ric);
- }
-
- private void setRicToNotReachable(Ric ric, Throwable t) {
- ric.setState(Ric.RicState.NOT_REACHABLE);
- logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage());
- }
-
- private void setRicToActive(Ric ric) {
- ric.setState(RicState.ACTIVE);
+ RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies);
+ recoveryTask.run(rics.getRics()); // recover all Rics
}
}
import com.google.gson.reflect.TypeToken;
import java.net.URL;
+import java.util.Collection;
import java.util.List;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.configuration.RicConfig;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.client.RestTemplate;
+import reactor.core.publisher.Mono;
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
}
}
+ static class A1ClientMock implements A1Client {
+ private final Policies policies;
+ private final PolicyTypes policyTypes;
+
+ A1ClientMock(Policies policies, PolicyTypes policyTypes) {
+ this.policies = policies;
+ this.policyTypes = policyTypes;
+ }
+
+ @Override
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
+ Vector<String> result = new Vector<>();
+ for (PolicyType p : this.policyTypes.getAll()) {
+ result.add(p.name());
+ }
+ return Mono.just(result);
+ }
+
+ @Override
+ public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId) {
+ try {
+ return Mono.just(this.policies.get(policyTypeId).json());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<String> putPolicy(String nearRtRicUrl, String policyId, String policyString) {
+ return Mono.just("OK");
+ }
+
+ @Override
+ public Mono<String> deletePolicy(String nearRtRicUrl, String policyId) {
+ return Mono.just("OK");
+ }
+
+ @Override
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+ return Mono.empty(); // problem is that a recovery will start
+ }
+ }
+
/**
* Overrides the BeanFactory.
*/
@TestConfiguration
static class TestBeanFactory {
+ private final Rics rics = new Rics();
+ private final Policies policies = new Policies();
+ private final PolicyTypes policyTypes = new PolicyTypes();
@Bean
public ApplicationConfig getApplicationConfig() {
return new MockApplicationConfig();
}
+ @Bean
+ A1Client getA1Client() {
+ return new A1ClientMock(this.policies, this.policyTypes);
+ }
+
+ @Bean
+ public Policies getPolicies() {
+ return this.policies;
+ }
+
+ @Bean
+ public PolicyTypes getPolicyTypes() {
+ return this.policyTypes;
+ }
+
@Bean
public Rics getRics() {
- Rics rics = new Rics();
- rics.put(new Ric(ImmutableRicConfig.builder().name("kista_1").baseUrl("kista_url")
- .managedElementIds(new Vector<>()).build()));
- rics.put(new Ric(ImmutableRicConfig.builder().name("ric1").baseUrl("ric_url")
- .managedElementIds(new Vector<>()).build()));
- return rics;
+ return this.rics;
}
}
assertThat(rsp).isEqualTo("ric1");
}
- // managedElmentId -> nodeName
-
@Test
public void testPutPolicy() throws Exception {
putService("service1");
String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
String json = "{}";
addPolicyType("type1", "ric1");
+ this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE);
this.restTemplate.put(url, json);
public void testDeletePolicy() throws Exception {
reset();
String url = baseUrl() + "/policy?instance=id";
- addPolicy("id", "typeName", "service1", "ric1");
+ Policy policy = addPolicy("id", "typeName", "service1", "ric1");
+ policy.ric().setState(Ric.RicState.ACTIVE);
assertThat(policies.size()).isEqualTo(1);
this.restTemplate.delete(url);
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.repository.ImmutablePolicyType;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import reactor.core.publisher.Flux;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
-@RunWith(SpringRunner.class)
+@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
public class MockPolicyAgent {
getPolicies(nearRtRicUrl).put(policyId, policyString);
}
- public Iterable<String> getPolicyIdentities(String nearRtRicUrl) {
+ public Collection<String> getPolicyIdentities(String nearRtRicUrl) {
return getPolicies(nearRtRicUrl).keySet();
}
}
@Override
- public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
Vector<String> result = new Vector<>();
for (PolicyType p : this.policyTypes.getAll()) {
result.add(p.name());
}
- return Flux.fromIterable(result);
+ return Mono.just(result);
}
@Override
- public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
- Iterable<String> result = policies.getPolicyIdentities(nearRtRicUrl);
- return Flux.fromIterable(result);
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+ Collection<String> result = policies.getPolicyIdentities(nearRtRicUrl);
+ return Mono.just(result);
}
@Override
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.jupiter.MockitoExtension;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Mono<String> policyTypeIds = Mono.just(Arrays.toString(new String[] {POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME}));
when(asyncRestClientMock.get(POLICYTYPES_IDENTITIES_URL)).thenReturn(policyTypeIds);
- Flux<String> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
+ Mono<?> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
verify(asyncRestClientMock).get(POLICYTYPES_IDENTITIES_URL);
- StepVerifier.create(policyTypeIdsFlux).expectNext(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME).expectComplete()
- .verify();
+ StepVerifier.create(policyTypeIdsFlux).expectNextCount(1).expectComplete().verify();
}
@Test
Mono<String> policyIds = Mono.just(Arrays.toString(new String[] {POLICY_1_ID, POLICY_2_ID}));
when(asyncRestClientMock.get(POLICIES_IDENTITIES_URL)).thenReturn(policyIds);
- Flux<String> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
+ Mono<?> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
verify(asyncRestClientMock).get(POLICIES_IDENTITIES_URL);
- StepVerifier.create(policyIdsFlux).expectNext(POLICY_1_ID, POLICY_2_ID).expectComplete().verify();
+ StepVerifier.create(policyIdsFlux).expectNextCount(1).expectComplete().verify();
}
@Test
import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
.build();
Policies policies = new Policies();
policies.put(policy1);
+ PolicyTypes types = new PolicyTypes();
- RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock);
+ RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock, types);
- when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(Flux.just("policyId1", "policyId2"));
+ Mono<Collection<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
+ when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policyIds);
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty());
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds);
+ when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema"));
supervisorUnderTest.checkAllRics();
- await().untilAsserted(() -> RicState.ACTIVE.equals(rics.getRic("ric2").state()));
+ await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state()));
+ await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state()));
+ await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state()));
- verify(a1ClientMock).getPolicyIdentities("baseUrl1");
verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2");
- verify(a1ClientMock).getPolicyIdentities("baseUrl2");
verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2");
verifyNoMoreInteractions(a1ClientMock);
}
import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE;
import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
- Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
- Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
- when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
- .thenReturn(fluxType1.concatWith(fluxType2));
- Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+ Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
+ Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policies);
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
- StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+ StartupService serviceUnderTest =
+ new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
serviceUnderTest.startup();
"Not correct no of types supported for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
POLICY_TYPE_1_NAME + " not supported by ric " + FIRST_RIC_NAME);
- assertEquals(1, firstRic.getManagedNodes().size(), "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
+ assertEquals(1, firstRic.getManagedElementIds().size(),
+ "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME);
Ric secondRic = rics.get(SECOND_RIC_NAME);
POLICY_TYPE_1_NAME + " not supported by ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isSupportingType(POLICY_TYPE_2_NAME),
POLICY_TYPE_2_NAME + " not supported by ric " + SECOND_RIC_NAME);
- assertEquals(2, secondRic.getManagedNodes().size(),
+ assertEquals(2, secondRic.getManagedElementIds().size(),
"Not correct no of managed nodes for ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isManaging(MANAGED_NODE_B), MANAGED_NODE_B + " not managed by ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isManaging(MANAGED_NODE_C), MANAGED_NODE_C + " not managed by ric " + SECOND_RIC_NAME);
ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
- Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
- doReturn(Flux.error(new Exception("Unable to contact ric.")), fluxType1).when(a1ClientMock)
- .getPolicyTypeIdentities(anyString());
+ Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
+ doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
- Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+ Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString());
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
- StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+ StartupService serviceUnderTest =
+ new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
serviceUnderTest.startup();
ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
- Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
- Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
- when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
- .thenReturn(fluxType1.concatWith(fluxType2));
+ Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
- Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
- doReturn(Flux.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
+ Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
+ doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
.getPolicyIdentities(anyString());
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
- StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+ StartupService serviceUnderTest =
+ new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
serviceUnderTest.startup();
assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
}
- private RicConfig getRicConfig(String name, String baseUrl, String... nodeNames) {
- Vector<String> managedNodes = new Vector<String>(1);
- for (String nodeName : nodeNames) {
- managedNodes.add(nodeName);
+ @SafeVarargs
+ private <T> Vector<T> toVector(T... objs) {
+ Vector<T> result = new Vector<>();
+ for (T o : objs) {
+ result.add(o);
}
+ return result;
+ }
+
+ private RicConfig getRicConfig(String name, String baseUrl, String... managedElementIds) {
ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
.name(name) //
- .managedElementIds(managedNodes) //
+ .managedElementIds(toVector(managedElementIds)) //
.baseUrl(baseUrl) //
.build();
return ricConfig;