Bugfix, only one RIC was synched 07/3307/5
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 17 Apr 2020 07:57:35 +0000 (09:57 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 20 Apr 2020 07:24:22 +0000 (09:24 +0200)
In RicSupervision, when recovery was started, cheching of the other ones was
stopped.

Other improvements such as:
Not starting consistency check when it is already started
Improved tracing

Change-Id: I82d1d48a091de8f24ebfa60b7cc1140e81d959f6
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java

index def2b30..ed94492 100644 (file)
@@ -130,7 +130,8 @@ public class Lock {
 
     @Override
     public String toString() {
-        return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
+        return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+            + this.lockRequestQueue.size();
     }
 
     /** returns the current number of granted locks */
index 07d2cda..fb2e4b9 100644 (file)
@@ -152,6 +152,11 @@ public class Ric {
         /**
          * The agent is synchronizing the view of the Ric.
          */
-        SYNCHRONIZING
+        SYNCHRONIZING,
+
+        /**
+         * A consistency check between the agent and the Ric is done
+         */
+        CONSISTENCY_CHECK
     }
 }
index 66cecd3..7faa376 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent.repository;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -37,7 +38,7 @@ public class Rics {
         registeredRics.put(ric.name(), ric);
     }
 
-    public synchronized Iterable<Ric> getRics() {
+    public synchronized Collection<Ric> getRics() {
         return new Vector<>(registeredRics.values());
     }
 
index 2666d60..77be8b3 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
@@ -58,6 +59,14 @@ public class RicSupervision {
     private final A1ClientFactory a1ClientFactory;
     private final Services services;
 
+    private static class SynchStartedException extends ServiceException {
+        private static final long serialVersionUID = 1L;
+
+        public SynchStartedException(String message) {
+            super(message);
+        }
+    }
+
     @Autowired
     public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
         Services services) {
@@ -80,20 +89,48 @@ public class RicSupervision {
     private Flux<RicData> createTask() {
         return Flux.fromIterable(rics.getRics()) //
             .flatMap(this::createRicData) //
-            .flatMap(this::checkOneRic) //
-            .onErrorResume(throwable -> Mono.empty());
+            .flatMap(this::checkOneRic);
 
     }
 
     private Mono<RicData> checkOneRic(RicData ricData) {
         return checkRicState(ricData) //
             .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+            .flatMap(notUsed -> setRicState(ricData)) //
             .flatMap(x -> checkRicPolicies(ricData)) //
-            .flatMap(x -> ricData.ric.getLock().unlock()) //
-            .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
             .flatMap(x -> checkRicPolicyTypes(ricData)) //
-            .doOnNext(x -> logger.debug("Ric: {} checked OK", ricData.ric.name())) //
-            .doOnError(t -> logger.debug("Ric: {} check Failed, exception: {}", ricData.ric.name(), t.getMessage()));
+            .doOnNext(x -> onRicCheckedOk(ricData)) //
+            .doOnError(t -> onRicCheckedError(t, ricData)) //
+            .onErrorResume(throwable -> Mono.empty());
+    }
+
+    private void onRicCheckedError(Throwable t, RicData ricData) {
+        logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage());
+        if (t instanceof SynchStartedException) {
+            // this is just a temporary state,
+            ricData.ric.setState(RicState.AVAILABLE);
+        } else {
+            ricData.ric.setState(RicState.UNAVAILABLE);
+        }
+        ricData.ric.getLock().unlockBlocking();
+    }
+
+    private void onRicCheckedOk(RicData ricData) {
+        logger.debug("Ric: {} checked OK", ricData.ric.name());
+        ricData.ric.setState(RicState.AVAILABLE);
+        ricData.ric.getLock().unlockBlocking();
+    }
+
+    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+    private Mono<RicData> setRicState(RicData ric) {
+        synchronized (ric) {
+            if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
+                logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name());
+                return Mono.empty();
+            }
+            ric.ric.setState(RicState.CONSISTENCY_CHECK);
+            return Mono.just(ric);
+        }
     }
 
     private static class RicData {
@@ -102,8 +139,12 @@ public class RicSupervision {
             this.a1Client = a1Client;
         }
 
+        A1Client getClient() {
+            return a1Client;
+        }
+
         final Ric ric;
-        final A1Client a1Client;
+        private final A1Client a1Client;
     }
 
     private Mono<RicData> createRicData(Ric ric) {
@@ -116,7 +157,7 @@ public class RicSupervision {
         if (ric.ric.getState() == RicState.UNAVAILABLE) {
             return startSynchronization(ric) //
                 .onErrorResume(t -> Mono.empty());
-        } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
+        } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
             return Mono.empty();
         } else {
             return Mono.just(ric);
@@ -124,7 +165,7 @@ public class RicSupervision {
     }
 
     private Mono<RicData> checkRicPolicies(RicData ric) {
-        return ric.a1Client.getPolicyIdentities() //
+        return ric.getClient().getPolicyIdentities() //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
@@ -144,7 +185,8 @@ public class RicSupervision {
     }
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
-        return ric.a1Client.getPolicyTypeIdentities() //
+
+        return ric.getClient().getPolicyTypeIdentities() //
             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
     }
 
@@ -163,7 +205,7 @@ public class RicSupervision {
     private Mono<RicData> startSynchronization(RicData ric) {
         RicSynchronizationTask synchronizationTask = createSynchronizationTask();
         synchronizationTask.run(ric.ric);
-        return Mono.error(new Exception("Syncronization started"));
+        return Mono.error(new SynchStartedException("Syncronization started"));
     }
 
     RicSynchronizationTask createSynchronizationTask() {
index 4e3fd0f..6905c70 100644 (file)
@@ -203,6 +203,12 @@ public class ApplicationTest {
         rsp = restClient().get(url).block();
         assertThat(rsp).contains("ric2");
         assertThat(rsp).doesNotContain("ric1");
+        assertThat(rsp).contains("AVAILABLE");
+
+        // All RICs
+        rsp = restClient().get("/rics").block();
+        assertThat(rsp).contains("ric2");
+        assertThat(rsp).contains("ric1");
 
         // Non existing policy type
         url = "/rics?policyType=XXXX";
@@ -211,23 +217,38 @@ public class ApplicationTest {
 
     @Test
     public void testSynchronization() throws Exception {
-        addRic("ric").setState(Ric.RicState.UNAVAILABLE);
-        String ricName = "ric";
-        Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName);
-
-        getA1Client(ricName).putPolicy(policy2); // put it in the RIC
+        // Two polictypes will be put in the NearRT RICs
+        PolicyTypes nearRtRicPolicyTypes = new PolicyTypes();
+        nearRtRicPolicyTypes.put(createPolicyType("typeName"));
+        nearRtRicPolicyTypes.put(createPolicyType("typeName2"));
+        this.a1ClientFactory.setPolicyTypes(nearRtRicPolicyTypes);
+
+        // One type and one instance added to the agent storage
+        final String ric1Name = "ric1";
+        Ric ric1 = addRic(ric1Name);
+        Policy policy2 = addPolicy("policyId2", "typeName", "service", ric1Name);
+        Ric ric2 = addRic("ric2");
+
+        getA1Client(ric1Name).putPolicy(policy2); // put it in the RIC
         policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC
 
         String policyId = "policyId";
-        Policy policy = addPolicy(policyId, "typeName", "service", ricName); // This should be created in the RIC
+        Policy policy = addPolicy(policyId, "typeName", "service", ric1Name); // This should be created in the RIC
         supervision.checkAllRics(); // The created policy should be put in the RIC
-        await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ricName).getState()));
-        await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ricName).getState()));
 
-        Policies ricPolicies = getA1Client(ricName).getPolicies();
+        // Wait until synch is completed
+        await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ric1Name).getState()));
+        await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ric1Name).getState()));
+        await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic("ric2").getState()));
+
+        Policies ricPolicies = getA1Client(ric1Name).getPolicies();
         assertThat(ricPolicies.size()).isEqualTo(1);
         Policy ricPolicy = ricPolicies.get(policyId);
         assertThat(ricPolicy.json()).isEqualTo(policy.json());
+
+        // Both types should be in the agent storage after the synch
+        assertThat(ric1.getSupportedPolicyTypes().size()).isEqualTo(2);
+        assertThat(ric2.getSupportedPolicyTypes().size()).isEqualTo(2);
     }
 
     @Test
@@ -695,12 +716,15 @@ public class ApplicationTest {
         return a1ClientFactory.getOrCreateA1Client(ricName);
     }
 
-    private PolicyType addPolicyType(String policyTypeName, String ricName) {
-        PolicyType type = ImmutablePolicyType.builder() //
+    private PolicyType createPolicyType(String policyTypeName) {
+        return ImmutablePolicyType.builder() //
             .name(policyTypeName) //
             .schema("{\"title\":\"" + policyTypeName + "\"}") //
             .build();
+    }
 
+    private PolicyType addPolicyType(String policyTypeName, String ricName) {
+        PolicyType type = createPolicyType(policyTypeName);
         policyTypes.put(type);
         addRic(ricName).addSupportedPolicyType(type);
         return type;
index 0a5b27b..73ca351 100644 (file)
@@ -104,7 +104,6 @@ public class RicSupervisionTest {
 
     @BeforeEach
     public void init() {
-        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         types.clear();
         policies.clear();
         rics.clear();
@@ -123,6 +122,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicIdleAndNoChangedPoliciesOrPolicyTypes_thenNoSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.AVAILABLE);
         RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
         rics.put(RIC_1);
@@ -144,6 +144,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicUndefined_thenSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.UNAVAILABLE);
         rics.put(RIC_1);
 
@@ -161,6 +162,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicSynchronizing_thenNoSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.SYNCHRONIZING);
         rics.put(RIC_1);
 
@@ -174,6 +176,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicIdleAndErrorGettingPolicyIdentities_thenNoSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.AVAILABLE);
         RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
         rics.put(RIC_1);
@@ -185,10 +188,12 @@ public class RicSupervisionTest {
 
         verify(supervisorUnderTest).checkAllRics();
         verifyNoMoreInteractions(supervisorUnderTest);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
     @Test
     public void whenRicIdleAndNotSameAmountOfPolicies_thenSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.AVAILABLE);
         rics.put(RIC_1);
 
@@ -211,6 +216,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicIdleAndSameAmountOfPoliciesButNotSamePolicies_thenSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.AVAILABLE);
         rics.put(RIC_1);
 
@@ -233,6 +239,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicIdleAndErrorGettingPolicyTypes_thenNoSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.AVAILABLE);
         RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
         rics.put(RIC_1);
@@ -249,6 +256,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicIdleAndNotSameAmountOfPolicyTypes_thenSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.AVAILABLE);
         RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
         rics.put(RIC_1);
@@ -272,6 +280,7 @@ public class RicSupervisionTest {
 
     @Test
     public void whenRicIdleAndSameAmountOfPolicyTypesButNotSameTypes_thenSynchronization() {
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         PolicyType policyType2 = ImmutablePolicyType.builder() //
             .name("policyType2") //
             .schema("") //
index 9967958..c77259c 100644 (file)
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
 public class MockA1ClientFactory extends A1ClientFactory {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, MockA1Client> clients = new HashMap<>();
-    private final PolicyTypes policyTypes;
+    private PolicyTypes policyTypes;
     private Duration asynchDelay = Duration.ofSeconds(0);
 
     public MockA1ClientFactory(PolicyTypes policyTypes) {
@@ -62,6 +62,10 @@ public class MockA1ClientFactory extends A1ClientFactory {
         return clients.get(ricName);
     }
 
+    public void setPolicyTypes(PolicyTypes policyTypes) {
+        this.policyTypes = policyTypes;
+    }
+
     /**
      * Simulate network latency. The REST responses will be generated by separate
      * threads
@@ -77,4 +81,8 @@ public class MockA1ClientFactory extends A1ClientFactory {
         clients.clear();
     }
 
+    public PolicyTypes getPolicyTypes() {
+        return this.policyTypes;
+    }
+
 }