Remove new code smells
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RepositorySupervision.java
index ce318dd..22905f6 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
@@ -41,7 +42,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Regularly checks the existing rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent.
  */
 @Component
 @EnableScheduling
@@ -77,12 +79,20 @@ public class RepositorySupervision {
         synchronized (this.rics) {
             return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
-                .flatMap(this::checkRicState) //
-                .flatMap(this::checkRicPolicies) //
-                .flatMap(this::checkRicPolicyTypes);
+                .flatMap(this::checkOneRic) //
+                .onErrorResume(throwable -> Mono.empty());
         }
     }
 
+    private Mono<RicData> checkOneRic(RicData ricData) {
+        return checkRicState(ricData) //
+            .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+            .flatMap(x -> checkRicPolicies(ricData)) //
+            .flatMap(x -> ricData.ric.getLock().unlock()) //
+            .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
+            .flatMap(x -> checkRicPolicyTypes(ricData)); //
+    }
+
     private static class RicData {
         RicData(Ric ric, A1Client a1Client) {
             this.ric = ric;
@@ -101,7 +111,8 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicState(RicData ric) {
         if (ric.ric.getState() == RicState.UNDEFINED) {
-            return startSynchronization(ric);
+            return startSynchronization(ric) //
+                .onErrorResume(t -> Mono.empty());
         } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
             return Mono.empty();
         } else {
@@ -111,7 +122,6 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicPolicies(RicData ric) {
         return ric.a1Client.getPolicyIdentities() //
-            .onErrorResume(t -> Mono.empty()) //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
@@ -120,18 +130,18 @@ public class RepositorySupervision {
             if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
                 return startSynchronization(ric);
             }
-        }
-        for (String policyId : ricPolicies) {
-            if (!policies.containsPolicy(policyId)) {
-                return startSynchronization(ric);
+
+            for (String policyId : ricPolicies) {
+                if (!policies.containsPolicy(policyId)) {
+                    return startSynchronization(ric);
+                }
             }
+            return Mono.just(ric);
         }
-        return Mono.just(ric);
     }
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
         return ric.a1Client.getPolicyTypeIdentities() //
-            .onErrorResume(notUsed -> Mono.empty()) //
             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
     }
 
@@ -148,12 +158,12 @@ public class RepositorySupervision {
     }
 
     private Mono<RicData> startSynchronization(RicData ric) {
-        RicSynchronizationTask recovery = createSynchronizationTask();
-        recovery.run(ric.ric);
-        return Mono.empty();
+        RicSynchronizationTask synchronizationTask = createSynchronizationTask();
+        synchronizationTask.run(ric.ric);
+        return Mono.error(new Exception("Syncronization started"));
     }
 
-    @SuppressWarnings("squid:S2629")
+    @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
     private void onRicChecked(RicData ric) {
         logger.debug("Ric: {} checked", ric.ric.name());
     }
@@ -165,4 +175,4 @@ public class RepositorySupervision {
     RicSynchronizationTask createSynchronizationTask() {
         return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
     }
-}
\ No newline at end of file
+}