X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRicRecoveryTask.java;h=fb41e2607c19f5353312ba5b2861baa60aa4c818;hb=refs%2Fchanges%2F43%2F2243%2F3;hp=73c94e25474314d6933e438e0f57717fe76125e4;hpb=592ce20ec359928373de2e7f06214c8f8ad73c20;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java index 73c94e25..fb41e260 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -32,6 +32,7 @@ import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.repository.Service; import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; @@ -59,9 +60,11 @@ public class RicRecoveryTask { this.services = services; } - public void run(Collection rics) { - for (Ric ric : rics) { - run(ric); + public void run(Rics rics) { + synchronized (rics) { + for (Ric ric : rics.getRics()) { + run(ric); + } } } @@ -85,26 +88,28 @@ public class RicRecoveryTask { private void onComplete(Ric ric) { logger.debug("Recovery completed for:" + ric.name()); - ric.setState(Ric.RicState.ACTIVE); + ric.setState(Ric.RicState.IDLE); notifyAllServices("Recovery completed for:" + ric.name()); } private void notifyAllServices(String body) { - for (Service service : services.getAll()) { - String url = service.getCallbackUrl(); - if (service.getCallbackUrl().length() > 0) { - createClient(url) // - .put("", body) // - .subscribe(rsp -> logger.debug("Service called"), - throwable -> logger.warn("Service called failed", throwable), - () -> logger.debug("Service called complete")); + synchronized (services) { + for (Service service : services.getAll()) { + String url = service.getCallbackUrl(); + if (service.getCallbackUrl().length() > 0) { + createClient(url) // + .put("", body) // + .subscribe(rsp -> logger.debug("Service called"), + throwable -> logger.warn("Service called failed", throwable), + () -> logger.debug("Service called complete")); + } } } } private void onError(Ric ric, Throwable t) { logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); - ric.setState(Ric.RicState.NOT_REACHABLE); + ric.setState(Ric.RicState.UNDEFINED); } private AsyncRestClient createClient(final String url) { @@ -115,7 +120,7 @@ public class RicRecoveryTask { ric.clearSupportedPolicyTypes(); return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // .flatMapMany(types -> Flux.fromIterable(types)) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) // .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); // } @@ -139,9 +144,11 @@ public class RicRecoveryTask { } private Flux deletePolicies(Ric ric) { - Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); - for (Policy policy : ricPolicies) { - this.policies.remove(policy); + synchronized (policies) { + Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); + for (Policy policy : ricPolicies) { + this.policies.remove(policy); + } } return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //