X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRicRecoveryTask.java;h=d7e85513d16cdb996c1ce04c41a007a176279e47;hb=95db19e2820102db0255ca57407faa333cbb4085;hp=8b3fadb01f5327652a039277e2cfeda0d82c832b;hpb=eec4647d457ca8c2ebecc86485127bd529919533;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java index 8b3fadb0..d7e85513 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -23,6 +23,7 @@ package org.oransc.policyagent.tasks; import java.util.Vector; import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicyType; @@ -50,13 +51,14 @@ public class RicRecoveryTask { private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class); - private final A1Client a1Client; + private final A1ClientFactory a1ClientFactory; private final PolicyTypes policyTypes; private final Policies policies; private final Services services; - public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies, Services services) { - this.a1Client = a1Client; + public RicRecoveryTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies, + Services services) { + this.a1ClientFactory = a1ClientFactory; this.policyTypes = policyTypes; this.policies = policies; this.services = services; @@ -71,16 +73,21 @@ public class RicRecoveryTask { } ric.setState(Ric.RicState.RECOVERING); } - Flux recoverTypes = recoverPolicyTypes(ric); - Flux deletePoliciesInRic = deleteAllPoliciesInRic(ric); - Flux recreatePoliciesInRic = recreateAllPoliciesInRic(ric); - - Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic) // + this.a1ClientFactory.createA1Client(ric)// + .flatMapMany(client -> startRecover(ric, client)) // .subscribe(x -> logger.debug("Recover: " + x), // throwable -> onRecoveryError(ric, throwable), // () -> onRecoveryComplete(ric)); } + private Flux startRecover(Ric ric, A1Client a1Client) { + Flux recoverTypes = recoverPolicyTypes(ric, a1Client); + Flux deletePoliciesInRic = deleteAllPoliciesInRic(ric, a1Client); + Flux recreatePoliciesInRic = recreateAllPoliciesInRic(ric, a1Client); + + return Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic); + } + private void onRecoveryComplete(Ric ric) { logger.debug("Recovery completed for:" + ric.name()); ric.setState(Ric.RicState.IDLE); @@ -104,11 +111,12 @@ public class RicRecoveryTask { private void onRecoveryError(Ric ric, Throwable t) { logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); - // If recovery fails, try to remove all instances deleteAllPolicies(ric); - Flux recoverTypes = recoverPolicyTypes(ric); - Flux deletePoliciesInRic = deleteAllPoliciesInRic(ric); + Flux recoverTypes = this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client)); + Flux deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(a1Client -> deleteAllPoliciesInRic(ric, a1Client)); Flux.merge(recoverTypes, deletePoliciesInRic) // .subscribe(x -> logger.debug("Brute recover: " + x), // @@ -125,16 +133,16 @@ public class RicRecoveryTask { return new AsyncRestClient(url); } - private Flux recoverPolicyTypes(Ric ric) { + private Flux recoverPolicyTypes(Ric ric, A1Client a1Client) { ric.clearSupportedPolicyTypes(); - return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + return a1Client.getPolicyTypeIdentities() // .flatMapMany(types -> Flux.fromIterable(types)) // .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // - .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) // + .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId, a1Client)) // .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); // } - private Mono getPolicyType(Ric ric, String policyTypeId) { + private Mono getPolicyType(Ric ric, String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { try { return Mono.just(policyTypes.getType(policyTypeId)); @@ -142,7 +150,7 @@ public class RicRecoveryTask { return Mono.error(e); } } - return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // + return a1Client.getPolicyTypeSchema(policyTypeId) // .flatMap(schema -> createPolicyType(policyTypeId, schema)); } @@ -160,14 +168,14 @@ public class RicRecoveryTask { } } - private Flux deleteAllPoliciesInRic(Ric ric) { - return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + private Flux deleteAllPoliciesInRic(Ric ric, A1Client a1Client) { + return a1Client.getPolicyIdentities() // .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); // + .flatMap(policyId -> a1Client.deletePolicy(policyId)); // } - private Flux recreateAllPoliciesInRic(Ric ric) { + private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { synchronized (policies) { return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) // .doOnNext(