X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRicRecoveryTask.java;h=73c94e25474314d6933e438e0f57717fe76125e4;hb=7a4a590fb0ebf8772169625cdda327da43c79c6d;hp=3883585ba1bbab6a48a984d96ea18e91af569a98;hpb=637540bc28fbf337e0c4c58c051a6b4f7ceb321d;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java index 3883585b..73c94e25 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Vector; import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicyType; import org.oransc.policyagent.repository.Policies; @@ -31,6 +32,8 @@ import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Service; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,11 +50,13 @@ public class RicRecoveryTask { private final A1Client a1Client; private final PolicyTypes policyTypes; private final Policies policies; + private final Services services; - public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) { + public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies, Services services) { this.a1Client = a1Client; this.policyTypes = policyTypes; this.policies = policies; + this.services = services; } public void run(Collection rics) { @@ -81,14 +86,31 @@ public class RicRecoveryTask { private void onComplete(Ric ric) { logger.debug("Recovery completed for:" + ric.name()); ric.setState(Ric.RicState.ACTIVE); + notifyAllServices("Recovery completed for:" + ric.name()); + } + private void notifyAllServices(String body) { + for (Service service : services.getAll()) { + String url = service.getCallbackUrl(); + if (service.getCallbackUrl().length() > 0) { + createClient(url) // + .put("", body) // + .subscribe(rsp -> logger.debug("Service called"), + throwable -> logger.warn("Service called failed", throwable), + () -> logger.debug("Service called complete")); + } + } } private void onError(Ric ric, Throwable t) { - logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); + logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); ric.setState(Ric.RicState.NOT_REACHABLE); } + private AsyncRestClient createClient(final String url) { + return new AsyncRestClient(url); + } + private Flux recoverPolicyTypes(Ric ric) { ric.clearSupportedPolicyTypes(); return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //