Added callback to R-APPS invoked after RIC recovery
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicRecoveryTask.java
index 3883585..73c94e2 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Vector;
 
 import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
 import org.oransc.policyagent.repository.Policies;
@@ -31,6 +32,8 @@ import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Service;
+import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,11 +50,13 @@ public class RicRecoveryTask {
     private final A1Client a1Client;
     private final PolicyTypes policyTypes;
     private final Policies policies;
+    private final Services services;
 
-    public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) {
+    public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies, Services services) {
         this.a1Client = a1Client;
         this.policyTypes = policyTypes;
         this.policies = policies;
+        this.services = services;
     }
 
     public void run(Collection<Ric> rics) {
@@ -81,14 +86,31 @@ public class RicRecoveryTask {
     private void onComplete(Ric ric) {
         logger.debug("Recovery completed for:" + ric.name());
         ric.setState(Ric.RicState.ACTIVE);
+        notifyAllServices("Recovery completed for:" + ric.name());
+    }
 
+    private void notifyAllServices(String body) {
+        for (Service service : services.getAll()) {
+            String url = service.getCallbackUrl();
+            if (service.getCallbackUrl().length() > 0) {
+                createClient(url) //
+                    .put("", body) //
+                    .subscribe(rsp -> logger.debug("Service called"),
+                        throwable -> logger.warn("Service called failed", throwable),
+                        () -> logger.debug("Service called complete"));
+            }
+        }
     }
 
     private void onError(Ric ric, Throwable t) {
-        logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+        logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
         ric.setState(Ric.RicState.NOT_REACHABLE);
     }
 
+    private AsyncRestClient createClient(final String url) {
+        return new AsyncRestClient(url);
+    }
+
     private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
         ric.clearSupportedPolicyTypes();
         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //