Changed in config will add and recover Rics
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicRecoveryTask.java
index 73c94e2..fb41e26 100644 (file)
@@ -32,6 +32,7 @@ import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Service;
 import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
@@ -59,9 +60,11 @@ public class RicRecoveryTask {
         this.services = services;
     }
 
-    public void run(Collection<Ric> rics) {
-        for (Ric ric : rics) {
-            run(ric);
+    public void run(Rics rics) {
+        synchronized (rics) {
+            for (Ric ric : rics.getRics()) {
+                run(ric);
+            }
         }
     }
 
@@ -85,26 +88,28 @@ public class RicRecoveryTask {
 
     private void onComplete(Ric ric) {
         logger.debug("Recovery completed for:" + ric.name());
-        ric.setState(Ric.RicState.ACTIVE);
+        ric.setState(Ric.RicState.IDLE);
         notifyAllServices("Recovery completed for:" + ric.name());
     }
 
     private void notifyAllServices(String body) {
-        for (Service service : services.getAll()) {
-            String url = service.getCallbackUrl();
-            if (service.getCallbackUrl().length() > 0) {
-                createClient(url) //
-                    .put("", body) //
-                    .subscribe(rsp -> logger.debug("Service called"),
-                        throwable -> logger.warn("Service called failed", throwable),
-                        () -> logger.debug("Service called complete"));
+        synchronized (services) {
+            for (Service service : services.getAll()) {
+                String url = service.getCallbackUrl();
+                if (service.getCallbackUrl().length() > 0) {
+                    createClient(url) //
+                        .put("", body) //
+                        .subscribe(rsp -> logger.debug("Service called"),
+                            throwable -> logger.warn("Service called failed", throwable),
+                            () -> logger.debug("Service called complete"));
+                }
             }
         }
     }
 
     private void onError(Ric ric, Throwable t) {
         logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
-        ric.setState(Ric.RicState.NOT_REACHABLE);
+        ric.setState(Ric.RicState.UNDEFINED);
     }
 
     private AsyncRestClient createClient(final String url) {
@@ -115,7 +120,7 @@ public class RicRecoveryTask {
         ric.clearSupportedPolicyTypes();
         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
             .flatMapMany(types -> Flux.fromIterable(types)) //
-            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
             .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
             .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
     }
@@ -139,9 +144,11 @@ public class RicRecoveryTask {
     }
 
     private Flux<String> deletePolicies(Ric ric) {
-        Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
-        for (Policy policy : ricPolicies) {
-            this.policies.remove(policy);
+        synchronized (policies) {
+            Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+            for (Policy policy : ricPolicies) {
+                this.policies.remove(policy);
+            }
         }
 
         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //