Fixed concurrency problems
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RepositorySupervision.java
index ce318dd..f75db21 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
@@ -41,7 +42,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Regularly checks the existing rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent.
  */
 @Component
 @EnableScheduling
@@ -78,8 +80,10 @@ public class RepositorySupervision {
             return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
                 .flatMap(this::checkRicState) //
+                .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
                 .flatMap(this::checkRicPolicies) //
-                .flatMap(this::checkRicPolicyTypes);
+                .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
+                .flatMap(this::checkRicPolicyTypes); //
         }
     }
 
@@ -111,22 +115,28 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicPolicies(RicData ric) {
         return ric.a1Client.getPolicyIdentities() //
-            .onErrorResume(t -> Mono.empty()) //
+            .onErrorResume(t -> {
+                ric.ric.getLock().unlock();
+                return Mono.empty();
+            }) //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
     private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
         synchronized (this.policies) {
             if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
+                ric.ric.getLock().unlock();
                 return startSynchronization(ric);
             }
-        }
-        for (String policyId : ricPolicies) {
-            if (!policies.containsPolicy(policyId)) {
-                return startSynchronization(ric);
+
+            for (String policyId : ricPolicies) {
+                if (!policies.containsPolicy(policyId)) {
+                    ric.ric.getLock().unlock();
+                    return startSynchronization(ric);
+                }
             }
+            return Mono.just(ric);
         }
-        return Mono.just(ric);
     }
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
@@ -165,4 +175,4 @@ public class RepositorySupervision {
     RicSynchronizationTask createSynchronizationTask() {
         return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
     }
-}
\ No newline at end of file
+}