Running Dmaap consumer in a seprate thread
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicRecoveryTask.java
index d7e8551..c88eb6c 100644 (file)
@@ -41,7 +41,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Recovery handling of RIC, which means:
+ * Recovery handling of RIC.
+ * This means:
  * - load all policy types
  * - send all policy instances to the RIC
  * --- if that fails remove all policy instances
@@ -68,8 +69,9 @@ public class RicRecoveryTask {
         logger.debug("Handling ric: {}", ric.getConfig().name());
 
         synchronized (ric) {
-            if (ric.state().equals(Ric.RicState.RECOVERING)) {
-                return; // Already running
+            if (ric.getState() == Ric.RicState.RECOVERING) {
+                logger.debug("Recovery ric: {} is already running", ric.getConfig().name());
+                return;
             }
             ric.setState(Ric.RicState.RECOVERING);
         }
@@ -82,7 +84,7 @@ public class RicRecoveryTask {
 
     private Flux<Object> startRecover(Ric ric, A1Client a1Client) {
         Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric, a1Client);
-        Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric, a1Client);
+        Flux<?> deletePoliciesInRic = a1Client.deleteAllPolicies();
         Flux<?> recreatePoliciesInRic = recreateAllPoliciesInRic(ric, a1Client);
 
         return Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic);
@@ -116,7 +118,7 @@ public class RicRecoveryTask {
         Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
             .flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client));
         Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
-            .flatMapMany(a1Client -> deleteAllPoliciesInRic(ric, a1Client));
+            .flatMapMany(a1Client -> a1Client.deleteAllPolicies());
 
         Flux.merge(recoverTypes, deletePoliciesInRic) //
             .subscribe(x -> logger.debug("Brute recover: " + x), //
@@ -168,13 +170,6 @@ public class RicRecoveryTask {
         }
     }
 
-    private Flux<String> deleteAllPoliciesInRic(Ric ric, A1Client a1Client) {
-        return a1Client.getPolicyIdentities() //
-            .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
-            .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
-            .flatMap(policyId -> a1Client.deletePolicy(policyId)); //
-    }
-
     private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
         synchronized (policies) {
             return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //