Supervision of NearRT RIC policy types and instances.
When there is a difference between the view in the NonRT RIC
and the NearRT RIC, all policies will be deleted and the
types will be refreshed.
Introduced that policies will be created and deleted in the NonRT RIC
when created/deleted from the agent NBI.
Added Mock mode, run by command:
mvn -Dtest=MockPolicyAgent test
Issue-ID: NONRTRIC-84
Change-Id: Ica621c990f352cd69efa0dca51451d5f3c755b68
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
--- /dev/null
+# O-RAN-SC NonRT RIC Dashboard Web Application
+
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of
+policices. It provides support for
+-Policy configuration. This includes
+ -One REST API towards all RICs in the network
+ -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
+ -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC
+-Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+-Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+-Consistency monitoring of RIC capabilities (policy types)
+
+The agent can be run stand alone in a simulated test mode. Then it
+simulates RICs.
+The REST API is published on port 8081 and it is started by command:
+mvn -Dtest=MockPolicyAgent test
+
+The backend server publishes live API documentation at the
+URL `http://your-host-name-here:8080/swagger-ui.html`
+
+## License
+
+Copyright (C) 2019 Nordix Foundation. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
package org.oransc.policyagent.clients;
-import reactor.core.publisher.Flux;
+import java.util.Collection;
+
import reactor.core.publisher.Mono;
public interface A1Client {
- public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl);
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl);
- public Flux<String> getPolicyIdentities(String nearRtRicUrl);
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl);
public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId);
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class A1ClientImpl implements A1Client {
}
@Override
- public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", nearRtRicUrl);
AsyncRestClient client = createClient(nearRtRicUrl);
- Mono<String> response = client.get("/policytypes/identities");
- return response.flatMapMany(this::createFlux);
+ return client.get("/policytypes/identities") //
+ .flatMap(this::parseJsonArrayOfString);
}
@Override
- public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
logger.debug("getPolicyIdentities nearRtRicUrl = {}", nearRtRicUrl);
AsyncRestClient client = createClient(nearRtRicUrl);
- Mono<String> response = client.get("/policies/identities");
- return response.flatMapMany(this::createFlux);
+ return client.get("/policies/identities") //
+ .flatMap(this::parseJsonArrayOfString);
}
@Override
return client.delete("/policies/" + policyId);
}
- private Flux<String> createFlux(String inputString) {
+ private Mono<Collection<String>> parseJsonArrayOfString(String inputString) {
try {
List<String> arrayList = new ArrayList<>();
JSONArray jsonArray = new JSONArray(inputString);
arrayList.add(jsonArray.getString(i));
}
logger.debug("A1 client: received list = {}", arrayList);
- return Flux.fromIterable(arrayList);
+ return Mono.just(arrayList);
} catch (JSONException ex) { // invalid json
- return Flux.error(ex);
+ return Mono.error(ex);
}
}
loadConfigurationFromFile(this.filepath);
refreshConfigTask = createRefreshTask() //
- .subscribe(e -> logger.info("Refreshed configuration data"),
+ .subscribe(notUsed -> logger.info("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
() -> logger.error("Configuration refresh terminated"));
}
import java.util.Collection;
import java.util.Vector;
+import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.ImmutablePolicy;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@Api(value = "Policy Management API")
private final Rics rics;
private final PolicyTypes policyTypes;
private final Policies policies;
+ private final A1Client a1Client;
+
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
@Autowired
- PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics) {
+ PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, A1Client a1Client) {
this.policyTypes = types;
this.policies = policies;
this.rics = rics;
+ this.a1Client = a1Client;
}
@GetMapping("/policy_schemas")
@DeleteMapping("/policy")
@ApiOperation(value = "Deletes the policy")
@ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted")})
- public ResponseEntity<Void> deletePolicy( //
- @RequestParam(name = "instance", required = true) String instance) {
+ public Mono<ResponseEntity<Void>> deletePolicy( //
+ @RequestParam(name = "instance", required = true) String id) {
+ Policy policy = policies.get(id);
+ if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) {
+ return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
+ .doOnEach(notUsed -> policies.removeId(id)) //
+ .flatMap(notUsed -> {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
+ });
+ } else {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+ }
+ }
- policies.removeId(instance);
- return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+ @PutMapping(path = "/policy")
+ @ApiOperation(value = "Create the policy")
+ @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
+ public Mono<ResponseEntity<String>> putPolicy( //
+ @RequestParam(name = "type", required = true) String typeName, //
+ @RequestParam(name = "instance", required = true) String instanceId, //
+ @RequestParam(name = "ric", required = true) String ricName, //
+ @RequestParam(name = "service", required = true) String service, //
+ @RequestBody String jsonBody) {
+
+ Ric ric = rics.get(ricName);
+ PolicyType type = policyTypes.get(typeName);
+ if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) {
+ Policy policy = ImmutablePolicy.builder() //
+ .id(instanceId) //
+ .json(jsonBody) //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName(service) //
+ .lastModified(getTimeStampUTC()) //
+ .build();
+ return a1Client.putPolicy(policy.ric().getConfig().baseUrl(), policy.id(), policy.json()) //
+ .doOnNext(notUsed -> policies.put(policy)) //
+ .flatMap(notUsed -> {
+ return Mono.just(new ResponseEntity<>(HttpStatus.CREATED));
+ });
+ }
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@GetMapping("/policies")
return java.time.Instant.now().toString();
}
- @PutMapping(path = "/policy")
- @ApiOperation(value = "Create the policy")
- @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
- public ResponseEntity<String> putPolicy( //
- @RequestParam(name = "type", required = true) String type, //
- @RequestParam(name = "instance", required = true) String instanceId, //
- @RequestParam(name = "ric", required = true) String ric, //
- @RequestParam(name = "service", required = true) String service, //
- @RequestBody String jsonBody) {
-
- try {
- // services.getService(service).ping();
- Ric ricObj = rics.getRic(ric);
- Policy policy = ImmutablePolicy.builder() //
- .id(instanceId) //
- .json(jsonBody) //
- .type(policyTypes.getType(type)) //
- .ric(ricObj) //
- .ownerServiceName(service) //
- .lastModified(getTimeStampUTC()) //
- .build();
- policies.put(policy);
- return new ResponseEntity<String>(HttpStatus.CREATED);
- } catch (ServiceException e) {
- return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
- }
- }
-
}
public String name();
- public Collection<String> nodeNames();
+ public Collection<String> managedElementIds();
public Collection<String> policyTypes();
}
if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
result.add(ImmutableRicInfo.builder() //
.name(ric.name()) //
- .nodeNames(ric.getManagedNodes()) //
+ .managedElementIds(ric.getManagedElementIds()) //
.policyTypes(ric.getSupportedPolicyTypeNames()) //
.build());
}
return policiesId.containsKey(id);
}
+ public synchronized Policy get(String id) {
+ return policiesId.get(id);
+ }
+
public synchronized Policy getPolicy(String id) throws ServiceException {
Policy p = policiesId.get(id);
if (p == null) {
return t;
}
+ public synchronized PolicyType get(String name) {
+ return types.get(name);
+ }
+
public synchronized void put(PolicyType type) {
types.put(type.name(), type);
}
*
* @return a vector containing the nodes managed by this Ric.
*/
- public Vector<String> getManagedNodes() {
+ public Vector<String> getManagedElementIds() {
return ricConfig.managedElementIds();
}
/**
* Determines if the given node is managed by this Ric.
*
- * @param nodeName the node name to check.
+ * @param managedElementId the node name to check.
* @return true if the given node is managed by this Ric.
*/
- public boolean isManaging(String nodeName) {
- return ricConfig.managedElementIds().contains(nodeName);
+ public boolean isManaging(String managedElementId) {
+ return ricConfig.managedElementIds().contains(managedElementId);
}
/**
* Adds the given node as managed by this Ric.
*
- * @param nodeName the node to add.
+ * @param managedElementId the node to add.
*/
- public void addManagedNode(String nodeName) {
- if (!ricConfig.managedElementIds().contains(nodeName)) {
- ricConfig.managedElementIds().add(nodeName);
+ public void addManagedElement(String managedElementId) {
+ if (!ricConfig.managedElementIds().contains(managedElementId)) {
+ ricConfig.managedElementIds().add(managedElementId);
}
}
/**
* Removes the given node as managed by this Ric.
*
- * @param nodeName the node to remove.
+ * @param managedElementId the node to remove.
*/
- public void removeManagedNode(String nodeName) {
- ricConfig.managedElementIds().remove(nodeName);
+ public void removeManagedElement(String managedElementId) {
+ ricConfig.managedElementIds().remove(managedElementId);
}
/**
}
/**
- * Adds policy types as supported by this Ric.
- *
- * @param types the policy types to support.
- */
- public void addSupportedPolicyTypes(Collection<PolicyType> types) {
- for (PolicyType type : types) {
- addSupportedPolicyType(type);
- }
- }
-
- /**
- * Removes a policy type as supported by this Ric.
- *
- * @param type the policy type to remove as supported by this Ric.
+ * Removes all policy type as supported by this Ric.
*/
- public void removeSupportedPolicyType(PolicyType type) {
- supportedPolicyTypes.remove(type.name());
+ public void clearSupportedPolicyTypes() {
+ supportedPolicyTypes.clear();
}
/**
/**
* The Ric cannot be contacted.
*/
- NOT_REACHABLE
+ NOT_REACHABLE, RECOVERING
}
}
public class Rics {
Map<String, Ric> rics = new HashMap<>();
- public void put(Ric ric) {
+ public synchronized void put(Ric ric) {
rics.put(ric.name(), ric);
}
ping();
}
- public String getName() {
+ public synchronized String getName() {
return this.name;
}
- public Duration getKeepAliveInterval() {
+ public synchronized Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}
package org.oransc.policyagent.tasks;
+import java.util.Collection;
+
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Rics rics;
private final Policies policies;
+ private final PolicyTypes policyTypes;
private final A1Client a1Client;
@Autowired
- public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) {
+ public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) {
this.rics = rics;
this.policies = policies;
this.a1Client = a1Client;
+ this.policyTypes = policyTypes;
}
/**
private Flux<Ric> createTask() {
return Flux.fromIterable(rics.getRics()) //
- .groupBy(ric -> ric.state()) //
- .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup));
+ .flatMap(ric -> checkInstances(ric)) //
+ .flatMap(ric -> checkTypes(ric));
}
- private Flux<Ric> handleGroup(Ric.RicState key, Flux<Ric> fluxGroup) {
- logger.debug("Handling group {}", key);
- switch (key) {
- case ACTIVE:
- return fluxGroup.flatMap(this::checkActive);
+ private Mono<Ric> checkInstances(Ric ric) {
- case NOT_REACHABLE:
- return fluxGroup.flatMap(this::checkNotReachable);
+ return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+ .onErrorResume(t -> Mono.empty()) //
+ .flatMap(ricP -> validateInstances(ricP, ric));
+ }
- default:
- // If not initiated, leave it to the StartupService.
- return Flux.empty();
- }
+ private Flux<Ric> junk() {
+ return Flux.empty();
}
- private Mono<Ric> checkActive(Ric ric) {
- logger.debug("Handling active ric {}", ric);
- a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
- .filter(policyId -> !policies.containsPolicy(policyId)) //
- .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
- .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
- .subscribe();
+ private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
+ if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
+ return startRecovery(ric);
+ }
+ for (String policyId : ricPolicies) {
+ if (!policies.containsPolicy(policyId)) {
+ return startRecovery(ric);
+ }
+ }
return Mono.just(ric);
}
- private Mono<Ric> checkNotReachable(Ric ric) {
- logger.debug("Handling not reachable ric {}", ric);
- a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
- .filter(policyId -> !policies.containsPolicy(policyId)) //
- .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
- .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
- .subscribe(null, null, () -> ric.setState(RicState.ACTIVE));
+ private Mono<Ric> checkTypes(Ric ric) {
+ return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+ .onErrorResume(t -> {
+ return Mono.empty();
+ }) //
+ .flatMap(ricTypes -> validateTypes(ricTypes, ric));
+ }
+
+ private Mono<Ric> validateTypes(Collection<String> ricTypes, Ric ric) {
+ if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) {
+ return startRecovery(ric);
+ }
+ for (String typeName : ricTypes) {
+ if (!ric.isSupportingType(typeName)) {
+ return startRecovery(ric);
+ }
+ }
return Mono.just(ric);
}
+ private Mono<Ric> startRecovery(Ric ric) {
+ RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies);
+ recovery.run(ric);
+ return Mono.empty();
+ }
+
private void onRicChecked(Ric ric) {
logger.info("Ric: " + ric.name() + " checked");
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import java.util.Collection;
+import java.util.Vector;
+
+import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Loads information about RealTime-RICs at startup.
+ */
+public class RicRecoveryTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
+
+ private final A1Client a1Client;
+ private final PolicyTypes policyTypes;
+ private final Policies policies;
+
+ public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) {
+ this.a1Client = a1Client;
+ this.policyTypes = policyTypes;
+ this.policies = policies;
+ }
+
+ public void run(Collection<Ric> rics) {
+ for (Ric ric : rics) {
+ run(ric);
+ }
+ }
+
+ public void run(Ric ric) {
+ logger.debug("Handling ric: {}", ric.getConfig().name());
+
+ synchronized (ric) {
+ if (ric.state().equals(Ric.RicState.RECOVERING)) {
+ return; // Already running
+ }
+ ric.setState(Ric.RicState.RECOVERING);
+ }
+ Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
+ Flux<?> deletedPolicies = deletePolicies(ric);
+
+ Flux.merge(recoveredTypes, deletedPolicies) //
+ .subscribe(x -> logger.debug("Recover: " + x), //
+ throwable -> onError(ric, throwable), //
+ () -> onComplete(ric));
+ }
+
+ private void onComplete(Ric ric) {
+ logger.debug("Recovery completed for:" + ric.name());
+ ric.setState(Ric.RicState.ACTIVE);
+
+ }
+
+ private void onError(Ric ric, Throwable t) {
+ logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+ ric.setState(Ric.RicState.NOT_REACHABLE);
+ }
+
+ private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
+ ric.clearSupportedPolicyTypes();
+ return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+ .flatMapMany(types -> Flux.fromIterable(types)) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+ .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
+ .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
+ }
+
+ private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
+ if (policyTypes.contains(policyTypeId)) {
+ try {
+ return Mono.just(policyTypes.getType(policyTypeId));
+ } catch (ServiceException e) {
+ return Mono.error(e);
+ }
+ }
+ return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
+ .flatMap(schema -> createPolicyType(policyTypeId, schema));
+ }
+
+ private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+ PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
+ policyTypes.put(pt);
+ return Mono.just(pt);
+ }
+
+ private Flux<String> deletePolicies(Ric ric) {
+ Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+ for (Policy policy : ricPolicies) {
+ this.policies.remove(policy);
+ }
+
+ return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+ .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
+ .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
+ .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
+ }
+}
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
/**
* Loads information about RealTime-RICs at startup.
*/
@Autowired
private A1Client a1Client;
- StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) {
+ @Autowired
+ private Policies policies;
+
+ StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client,
+ Policies policies) {
this.applicationConfig = appConfig;
this.rics = rics;
this.policyTypes = policyTypes;
this.a1Client = a1Client;
+ this.policies = policies;
}
/**
* Reads the configured Rics and performs the service discovery. The result is put into the repository.
*/
public void startup() {
+ logger.debug("Starting up");
applicationConfig.initialize();
- Flux.fromIterable(applicationConfig.getRicConfigs()) //
- .map(ricConfig -> new Ric(ricConfig)) //
- .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) //
- .flatMap(this::addPolicyTypesForRic) //
- .flatMap(this::deletePoliciesForRic) //
- .doOnNext(rics::put) //
- .subscribe();
- }
-
- private Mono<Ric> addPolicyTypesForRic(Ric ric) {
- a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
- .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) //
- .flatMap(type -> addTypeToRic(ric, type)) //
- .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric));
- return Mono.just(ric);
- }
-
- private Mono<PolicyType> addTypeToRepo(Ric ric, String policyTypeId) {
- if (policyTypes.contains(policyTypeId)) {
- try {
- return Mono.just(policyTypes.getType(policyTypeId));
- } catch (ServiceException e) {
- return Mono.error(e);
- }
+ for (RicConfig ricConfig : applicationConfig.getRicConfigs()) {
+ rics.put(new Ric(ricConfig));
}
- return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
- .flatMap(schema -> createPolicyType(policyTypeId, schema));
- }
-
- private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
- PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
- policyTypes.put(pt);
- return Mono.just(pt);
- }
-
- private Mono<PolicyType> addTypeToRic(Ric ric, PolicyType policyType) {
- ric.addSupportedPolicyType(policyType);
- return Mono.just(policyType);
- }
-
- private Mono<Ric> deletePoliciesForRic(Ric ric) {
- if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) {
- a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
- .doOnNext(
- policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
- .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
- .subscribe(null, cause -> setRicToNotReachable(ric, cause), null);
- }
-
- return Mono.just(ric);
- }
-
- private void setRicToNotReachable(Ric ric, Throwable t) {
- ric.setState(Ric.RicState.NOT_REACHABLE);
- logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage());
- }
-
- private void setRicToActive(Ric ric) {
- ric.setState(RicState.ACTIVE);
+ RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies);
+ recoveryTask.run(rics.getRics()); // recover all Rics
}
}
import com.google.gson.reflect.TypeToken;
import java.net.URL;
+import java.util.Collection;
import java.util.List;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.configuration.RicConfig;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.client.RestTemplate;
+import reactor.core.publisher.Mono;
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
}
}
+ static class A1ClientMock implements A1Client {
+ private final Policies policies;
+ private final PolicyTypes policyTypes;
+
+ A1ClientMock(Policies policies, PolicyTypes policyTypes) {
+ this.policies = policies;
+ this.policyTypes = policyTypes;
+ }
+
+ @Override
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
+ Vector<String> result = new Vector<>();
+ for (PolicyType p : this.policyTypes.getAll()) {
+ result.add(p.name());
+ }
+ return Mono.just(result);
+ }
+
+ @Override
+ public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId) {
+ try {
+ return Mono.just(this.policies.get(policyTypeId).json());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<String> putPolicy(String nearRtRicUrl, String policyId, String policyString) {
+ return Mono.just("OK");
+ }
+
+ @Override
+ public Mono<String> deletePolicy(String nearRtRicUrl, String policyId) {
+ return Mono.just("OK");
+ }
+
+ @Override
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+ return Mono.empty(); // problem is that a recovery will start
+ }
+ }
+
/**
* Overrides the BeanFactory.
*/
@TestConfiguration
static class TestBeanFactory {
+ private final Rics rics = new Rics();
+ private final Policies policies = new Policies();
+ private final PolicyTypes policyTypes = new PolicyTypes();
@Bean
public ApplicationConfig getApplicationConfig() {
return new MockApplicationConfig();
}
+ @Bean
+ A1Client getA1Client() {
+ return new A1ClientMock(this.policies, this.policyTypes);
+ }
+
+ @Bean
+ public Policies getPolicies() {
+ return this.policies;
+ }
+
+ @Bean
+ public PolicyTypes getPolicyTypes() {
+ return this.policyTypes;
+ }
+
@Bean
public Rics getRics() {
- Rics rics = new Rics();
- rics.put(new Ric(ImmutableRicConfig.builder().name("kista_1").baseUrl("kista_url")
- .managedElementIds(new Vector<>()).build()));
- rics.put(new Ric(ImmutableRicConfig.builder().name("ric1").baseUrl("ric_url")
- .managedElementIds(new Vector<>()).build()));
- return rics;
+ return this.rics;
}
}
assertThat(rsp).isEqualTo("ric1");
}
- // managedElmentId -> nodeName
-
@Test
public void testPutPolicy() throws Exception {
putService("service1");
String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
String json = "{}";
addPolicyType("type1", "ric1");
+ this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE);
this.restTemplate.put(url, json);
public void testDeletePolicy() throws Exception {
reset();
String url = baseUrl() + "/policy?instance=id";
- addPolicy("id", "typeName", "service1", "ric1");
+ Policy policy = addPolicy("id", "typeName", "service1", "ric1");
+ policy.ric().setState(Ric.RicState.ACTIVE);
assertThat(policies.size()).isEqualTo(1);
this.restTemplate.delete(url);
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.repository.ImmutablePolicyType;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import reactor.core.publisher.Flux;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
-@RunWith(SpringRunner.class)
+@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
public class MockPolicyAgent {
getPolicies(nearRtRicUrl).put(policyId, policyString);
}
- public Iterable<String> getPolicyIdentities(String nearRtRicUrl) {
+ public Collection<String> getPolicyIdentities(String nearRtRicUrl) {
return getPolicies(nearRtRicUrl).keySet();
}
}
@Override
- public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+ public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
Vector<String> result = new Vector<>();
for (PolicyType p : this.policyTypes.getAll()) {
result.add(p.name());
}
- return Flux.fromIterable(result);
+ return Mono.just(result);
}
@Override
- public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
- Iterable<String> result = policies.getPolicyIdentities(nearRtRicUrl);
- return Flux.fromIterable(result);
+ public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+ Collection<String> result = policies.getPolicyIdentities(nearRtRicUrl);
+ return Mono.just(result);
}
@Override
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.jupiter.MockitoExtension;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Mono<String> policyTypeIds = Mono.just(Arrays.toString(new String[] {POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME}));
when(asyncRestClientMock.get(POLICYTYPES_IDENTITIES_URL)).thenReturn(policyTypeIds);
- Flux<String> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
+ Mono<?> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
verify(asyncRestClientMock).get(POLICYTYPES_IDENTITIES_URL);
- StepVerifier.create(policyTypeIdsFlux).expectNext(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME).expectComplete()
- .verify();
+ StepVerifier.create(policyTypeIdsFlux).expectNextCount(1).expectComplete().verify();
}
@Test
Mono<String> policyIds = Mono.just(Arrays.toString(new String[] {POLICY_1_ID, POLICY_2_ID}));
when(asyncRestClientMock.get(POLICIES_IDENTITIES_URL)).thenReturn(policyIds);
- Flux<String> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
+ Mono<?> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
verify(asyncRestClientMock).get(POLICIES_IDENTITIES_URL);
- StepVerifier.create(policyIdsFlux).expectNext(POLICY_1_ID, POLICY_2_ID).expectComplete().verify();
+ StepVerifier.create(policyIdsFlux).expectNextCount(1).expectComplete().verify();
}
@Test
import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
.build();
Policies policies = new Policies();
policies.put(policy1);
+ PolicyTypes types = new PolicyTypes();
- RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock);
+ RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock, types);
- when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(Flux.just("policyId1", "policyId2"));
+ Mono<Collection<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
+ when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policyIds);
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty());
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds);
+ when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema"));
supervisorUnderTest.checkAllRics();
- await().untilAsserted(() -> RicState.ACTIVE.equals(rics.getRic("ric2").state()));
+ await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state()));
+ await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state()));
+ await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state()));
- verify(a1ClientMock).getPolicyIdentities("baseUrl1");
verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2");
- verify(a1ClientMock).getPolicyIdentities("baseUrl2");
verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2");
verifyNoMoreInteractions(a1ClientMock);
}
import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE;
import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Vector;
import org.junit.jupiter.api.Test;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
- Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
- Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
- when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
- .thenReturn(fluxType1.concatWith(fluxType2));
- Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+ Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
+ Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policies);
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
- StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+ StartupService serviceUnderTest =
+ new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
serviceUnderTest.startup();
"Not correct no of types supported for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
POLICY_TYPE_1_NAME + " not supported by ric " + FIRST_RIC_NAME);
- assertEquals(1, firstRic.getManagedNodes().size(), "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
+ assertEquals(1, firstRic.getManagedElementIds().size(),
+ "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME);
Ric secondRic = rics.get(SECOND_RIC_NAME);
POLICY_TYPE_1_NAME + " not supported by ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isSupportingType(POLICY_TYPE_2_NAME),
POLICY_TYPE_2_NAME + " not supported by ric " + SECOND_RIC_NAME);
- assertEquals(2, secondRic.getManagedNodes().size(),
+ assertEquals(2, secondRic.getManagedElementIds().size(),
"Not correct no of managed nodes for ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isManaging(MANAGED_NODE_B), MANAGED_NODE_B + " not managed by ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isManaging(MANAGED_NODE_C), MANAGED_NODE_C + " not managed by ric " + SECOND_RIC_NAME);
ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
- Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
- doReturn(Flux.error(new Exception("Unable to contact ric.")), fluxType1).when(a1ClientMock)
- .getPolicyTypeIdentities(anyString());
+ Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
+ doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
- Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+ Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString());
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
- StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+ StartupService serviceUnderTest =
+ new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
serviceUnderTest.startup();
ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
- Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
- Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
- when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
- .thenReturn(fluxType1.concatWith(fluxType2));
+ Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
- Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
- doReturn(Flux.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
+ Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
+ doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
.getPolicyIdentities(anyString());
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
- StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+ StartupService serviceUnderTest =
+ new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
serviceUnderTest.startup();
assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
}
- private RicConfig getRicConfig(String name, String baseUrl, String... nodeNames) {
- Vector<String> managedNodes = new Vector<String>(1);
- for (String nodeName : nodeNames) {
- managedNodes.add(nodeName);
+ @SafeVarargs
+ private <T> Vector<T> toVector(T... objs) {
+ Vector<T> result = new Vector<>();
+ for (T o : objs) {
+ result.add(o);
}
+ return result;
+ }
+
+ private RicConfig getRicConfig(String name, String baseUrl, String... managedElementIds) {
ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
.name(name) //
- .managedElementIds(managedNodes) //
+ .managedElementIds(toVector(managedElementIds)) //
.baseUrl(baseUrl) //
.build();
return ricConfig;