Fixed concurrency problems
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RepositorySupervision.java
index f75db21..8dcb811 100644 (file)
@@ -79,14 +79,20 @@ public class RepositorySupervision {
         synchronized (this.rics) {
             return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
-                .flatMap(this::checkRicState) //
-                .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
-                .flatMap(this::checkRicPolicies) //
-                .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
-                .flatMap(this::checkRicPolicyTypes); //
+                .flatMap(this::checkOneRic) //
+                .onErrorResume(throwable -> Mono.empty());
         }
     }
 
+    private Mono<RicData> checkOneRic(RicData ricData) {
+        return checkRicState(ricData) //
+            .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+            .flatMap(x -> checkRicPolicies(ricData)) //
+            .flatMap(x -> ricData.ric.getLock().unlock()) //
+            .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
+            .flatMap(x -> checkRicPolicyTypes(ricData)); //
+    }
+
     private static class RicData {
         RicData(Ric ric, A1Client a1Client) {
             this.ric = ric;
@@ -105,7 +111,8 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicState(RicData ric) {
         if (ric.ric.getState() == RicState.UNDEFINED) {
-            return startSynchronization(ric);
+            return startSynchronization(ric) //
+                .onErrorResume(t -> Mono.empty());
         } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
             return Mono.empty();
         } else {
@@ -115,23 +122,17 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicPolicies(RicData ric) {
         return ric.a1Client.getPolicyIdentities() //
-            .onErrorResume(t -> {
-                ric.ric.getLock().unlock();
-                return Mono.empty();
-            }) //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
     private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
         synchronized (this.policies) {
             if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
-                ric.ric.getLock().unlock();
                 return startSynchronization(ric);
             }
 
             for (String policyId : ricPolicies) {
                 if (!policies.containsPolicy(policyId)) {
-                    ric.ric.getLock().unlock();
                     return startSynchronization(ric);
                 }
             }
@@ -141,7 +142,6 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
         return ric.a1Client.getPolicyTypeIdentities() //
-            .onErrorResume(notUsed -> Mono.empty()) //
             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
     }
 
@@ -160,7 +160,7 @@ public class RepositorySupervision {
     private Mono<RicData> startSynchronization(RicData ric) {
         RicSynchronizationTask recovery = createSynchronizationTask();
         recovery.run(ric.ric);
-        return Mono.empty();
+        return Mono.error(new Exception("Syncronization started"));
     }
 
     @SuppressWarnings("squid:S2629")