Merge "Added STD sim 2.0.0 tests"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicSupervision.java
index c2dd18e..228038a 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
@@ -58,6 +59,28 @@ public class RicSupervision {
     private final A1ClientFactory a1ClientFactory;
     private final Services services;
 
+    private static class SynchStartedException extends ServiceException {
+        private static final long serialVersionUID = 1L;
+
+        public SynchStartedException(String message) {
+            super(message);
+        }
+    }
+
+    private static class RicData {
+        RicData(Ric ric, A1Client a1Client) {
+            this.ric = ric;
+            this.a1Client = a1Client;
+        }
+
+        A1Client getClient() {
+            return a1Client;
+        }
+
+        final Ric ric;
+        private final A1Client a1Client;
+    }
+
     @Autowired
     public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
         Services services) {
@@ -74,38 +97,53 @@ public class RicSupervision {
     @Scheduled(fixedRate = 1000 * 60)
     public void checkAllRics() {
         logger.debug("Checking Rics starting");
-        createTask().subscribe( //
-            ric -> logger.debug("Ric: {} checked", ric.ric.name()), //
-            null, //
-            () -> logger.debug("Checking Rics completed") //
-        );
+        createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
     }
 
     private Flux<RicData> createTask() {
         return Flux.fromIterable(rics.getRics()) //
             .flatMap(this::createRicData) //
-            .flatMap(this::checkOneRic) //
-            .onErrorResume(throwable -> Mono.empty());
-
+            .flatMap(this::checkOneRic);
     }
 
     private Mono<RicData> checkOneRic(RicData ricData) {
         return checkRicState(ricData) //
             .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+            .flatMap(notUsed -> setRicState(ricData)) //
             .flatMap(x -> checkRicPolicies(ricData)) //
-            .flatMap(x -> ricData.ric.getLock().unlock()) //
-            .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
-            .flatMap(x -> checkRicPolicyTypes(ricData)); //
+            .flatMap(x -> checkRicPolicyTypes(ricData)) //
+            .doOnNext(x -> onRicCheckedOk(ricData)) //
+            .doOnError(t -> onRicCheckedError(t, ricData)) //
+            .onErrorResume(throwable -> Mono.empty());
     }
 
-    private static class RicData {
-        RicData(Ric ric, A1Client a1Client) {
-            this.ric = ric;
-            this.a1Client = a1Client;
+    private void onRicCheckedError(Throwable t, RicData ricData) {
+        logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage());
+        if (t instanceof SynchStartedException) {
+            // this is just a temporary state,
+            ricData.ric.setState(RicState.AVAILABLE);
+        } else {
+            ricData.ric.setState(RicState.UNAVAILABLE);
         }
+        ricData.ric.getLock().unlockBlocking();
+    }
 
-        final Ric ric;
-        final A1Client a1Client;
+    private void onRicCheckedOk(RicData ricData) {
+        logger.debug("Ric: {} checked OK", ricData.ric.name());
+        ricData.ric.setState(RicState.AVAILABLE);
+        ricData.ric.getLock().unlockBlocking();
+    }
+
+    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+    private Mono<RicData> setRicState(RicData ric) {
+        synchronized (ric) {
+            if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
+                logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name());
+                return Mono.empty();
+            }
+            ric.ric.setState(RicState.CONSISTENCY_CHECK);
+            return Mono.just(ric);
+        }
     }
 
     private Mono<RicData> createRicData(Ric ric) {
@@ -118,7 +156,7 @@ public class RicSupervision {
         if (ric.ric.getState() == RicState.UNAVAILABLE) {
             return startSynchronization(ric) //
                 .onErrorResume(t -> Mono.empty());
-        } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
+        } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
             return Mono.empty();
         } else {
             return Mono.just(ric);
@@ -126,7 +164,7 @@ public class RicSupervision {
     }
 
     private Mono<RicData> checkRicPolicies(RicData ric) {
-        return ric.a1Client.getPolicyIdentities() //
+        return ric.getClient().getPolicyIdentities() //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
@@ -146,7 +184,7 @@ public class RicSupervision {
     }
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
-        return ric.a1Client.getPolicyTypeIdentities() //
+        return ric.getClient().getPolicyTypeIdentities() //
             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
     }
 
@@ -165,7 +203,7 @@ public class RicSupervision {
     private Mono<RicData> startSynchronization(RicData ric) {
         RicSynchronizationTask synchronizationTask = createSynchronizationTask();
         synchronizationTask.run(ric.ric);
-        return Mono.error(new Exception("Syncronization started"));
+        return Mono.error(new SynchStartedException("Syncronization started"));
     }
 
     RicSynchronizationTask createSynchronizationTask() {