import reactor.core.publisher.Mono;
/**
- * Recovery handling of RIC, which means:
+ * Recovery handling of RIC.
+ * This means:
* - load all policy types
* - send all policy instances to the RIC
* --- if that fails remove all policy instances
logger.debug("Handling ric: {}", ric.getConfig().name());
synchronized (ric) {
- if (ric.state().equals(Ric.RicState.RECOVERING)) {
+ if (ric.getState() == Ric.RicState.RECOVERING) {
return; // Already running
}
ric.setState(Ric.RicState.RECOVERING);
private Flux<Object> startRecover(Ric ric, A1Client a1Client) {
Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric, a1Client);
- Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric, a1Client);
+ Flux<?> deletePoliciesInRic = a1Client.deleteAllPolicies();
Flux<?> recreatePoliciesInRic = recreateAllPoliciesInRic(ric, a1Client);
return Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic);
Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
.flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client));
Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(a1Client -> deleteAllPoliciesInRic(ric, a1Client));
+ .flatMapMany(a1Client -> a1Client.deleteAllPolicies());
Flux.merge(recoverTypes, deletePoliciesInRic) //
.subscribe(x -> logger.debug("Brute recover: " + x), //
}
}
- private Flux<String> deleteAllPoliciesInRic(Ric ric, A1Client a1Client) {
- return a1Client.getPolicyIdentities() //
- .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
- .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
- .flatMap(policyId -> a1Client.deletePolicy(policyId)); //
- }
-
private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
synchronized (policies) {
return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //