X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRicRecoveryTask.java;h=543fdf8220bd0bcc60858a8aff1c6b7390dcff10;hb=d1623c5066ebb6152c6a2ba0fe889e32c75d8890;hp=3883585ba1bbab6a48a984d96ea18e91af569a98;hpb=637540bc28fbf337e0c4c58c051a6b4f7ceb321d;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java index 3883585b..543fdf82 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -20,10 +20,11 @@ package org.oransc.policyagent.tasks; -import java.util.Collection; import java.util.Vector; import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.clients.A1ClientFactory; +import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicyType; import org.oransc.policyagent.repository.Policies; @@ -31,6 +32,8 @@ import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Service; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,26 +41,27 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Loads information about RealTime-RICs at startup. + * Recovery handling of RIC, which means: + * - load all policy types + * - send all policy instances to the RIC + * --- if that fails remove all policy instances + * - Notify subscribing services */ public class RicRecoveryTask { private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class); - private final A1Client a1Client; + private final A1ClientFactory a1ClientFactory; private final PolicyTypes policyTypes; private final Policies policies; + private final Services services; - public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) { - this.a1Client = a1Client; + public RicRecoveryTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies, + Services services) { + this.a1ClientFactory = a1ClientFactory; this.policyTypes = policyTypes; this.policies = policies; - } - - public void run(Collection rics) { - for (Ric ric : rics) { - run(ric); - } + this.services = services; } public void run(Ric ric) { @@ -69,36 +73,76 @@ public class RicRecoveryTask { } ric.setState(Ric.RicState.RECOVERING); } - Flux recoveredTypes = recoverPolicyTypes(ric); - Flux deletedPolicies = deletePolicies(ric); - - Flux.merge(recoveredTypes, deletedPolicies) // + this.a1ClientFactory.createA1Client(ric)// + .flatMapMany(client -> startRecover(ric, client)) // .subscribe(x -> logger.debug("Recover: " + x), // - throwable -> onError(ric, throwable), // - () -> onComplete(ric)); + throwable -> onRecoveryError(ric, throwable), // + () -> onRecoveryComplete(ric)); } - private void onComplete(Ric ric) { + private Flux startRecover(Ric ric, A1Client a1Client) { + Flux recoverTypes = recoverPolicyTypes(ric, a1Client); + Flux deletePoliciesInRic = a1Client.deleteAllPolicies(); + Flux recreatePoliciesInRic = recreateAllPoliciesInRic(ric, a1Client); + + return Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic); + } + + private void onRecoveryComplete(Ric ric) { logger.debug("Recovery completed for:" + ric.name()); - ric.setState(Ric.RicState.ACTIVE); + ric.setState(Ric.RicState.IDLE); + notifyAllServices("Recovery completed for:" + ric.name()); + } + + private void notifyAllServices(String body) { + synchronized (services) { + for (Service service : services.getAll()) { + String url = service.getCallbackUrl(); + if (service.getCallbackUrl().length() > 0) { + createClient(url) // + .put("", body) // + .subscribe(rsp -> logger.debug("Service called"), + throwable -> logger.warn("Service called failed", throwable), + () -> logger.debug("Service called complete")); + } + } + } + } + + private void onRecoveryError(Ric ric, Throwable t) { + logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); + // If recovery fails, try to remove all instances + deleteAllPolicies(ric); + Flux recoverTypes = this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client)); + Flux deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(a1Client -> a1Client.deleteAllPolicies()); + + Flux.merge(recoverTypes, deletePoliciesInRic) // + .subscribe(x -> logger.debug("Brute recover: " + x), // + throwable -> onRemoveAllError(ric, throwable), // + () -> onRecoveryComplete(ric)); + } + private void onRemoveAllError(Ric ric, Throwable t) { + logger.warn("Remove all failed for: {}, reason: {}", ric.name(), t.getMessage()); + ric.setState(Ric.RicState.UNDEFINED); } - private void onError(Ric ric, Throwable t) { - logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); - ric.setState(Ric.RicState.NOT_REACHABLE); + private AsyncRestClient createClient(final String url) { + return new AsyncRestClient(url); } - private Flux recoverPolicyTypes(Ric ric) { + private Flux recoverPolicyTypes(Ric ric, A1Client a1Client) { ric.clearSupportedPolicyTypes(); - return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + return a1Client.getPolicyTypeIdentities() // .flatMapMany(types -> Flux.fromIterable(types)) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) - .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // + .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId, a1Client)) // .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); // } - private Mono getPolicyType(Ric ric, String policyTypeId) { + private Mono getPolicyType(Ric ric, String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { try { return Mono.just(policyTypes.getType(policyTypeId)); @@ -106,7 +150,7 @@ public class RicRecoveryTask { return Mono.error(e); } } - return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // + return a1Client.getPolicyTypeSchema(policyTypeId) // .flatMap(schema -> createPolicyType(policyTypeId, schema)); } @@ -116,15 +160,21 @@ public class RicRecoveryTask { return Mono.just(pt); } - private Flux deletePolicies(Ric ric) { - Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); - for (Policy policy : ricPolicies) { - this.policies.remove(policy); + private void deleteAllPolicies(Ric ric) { + synchronized (policies) { + for (Policy policy : policies.getForRic(ric.name())) { + this.policies.remove(policy); + } } + } - return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // - .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); // + private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { + synchronized (policies) { + return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) // + .doOnNext( + policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name())) + .flatMap(policy -> a1Client.putPolicy(policy)); + } } + }