From 7de2f355ed1956001d15cb7e57fdd37fdf88cdc5 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 17 Apr 2020 09:57:35 +0200 Subject: [PATCH] Bugfix, only one RIC was synched In RicSupervision, when recovery was started, cheching of the other ones was stopped. Other improvements such as: Not starting consistency check when it is already started Improved tracing Change-Id: I82d1d48a091de8f24ebfa60b7cc1140e81d959f6 Issue-ID: NONRTRIC-164 Signed-off-by: PatrikBuhr --- .../org/oransc/policyagent/repository/Lock.java | 3 +- .../org/oransc/policyagent/repository/Ric.java | 7 ++- .../org/oransc/policyagent/repository/Rics.java | 3 +- .../oransc/policyagent/tasks/RicSupervision.java | 64 ++++++++++++++++++---- .../org/oransc/policyagent/ApplicationTest.java | 46 ++++++++++++---- .../policyagent/tasks/RicSupervisionTest.java | 11 +++- .../policyagent/utils/MockA1ClientFactory.java | 10 +++- 7 files changed, 117 insertions(+), 27 deletions(-) diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java index def2b30a..ed944924 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java @@ -130,7 +130,8 @@ public class Lock { @Override public String toString() { - return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive; + return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: " + + this.lockRequestQueue.size(); } /** returns the current number of granted locks */ diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java index 07d2cda9..fb2e4b92 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java @@ -152,6 +152,11 @@ public class Ric { /** * The agent is synchronizing the view of the Ric. */ - SYNCHRONIZING + SYNCHRONIZING, + + /** + * A consistency check between the agent and the Ric is done + */ + CONSISTENCY_CHECK } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java index 66cecd3d..7faa3766 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java @@ -20,6 +20,7 @@ package org.oransc.policyagent.repository; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -37,7 +38,7 @@ public class Rics { registeredRics.put(ric.name(), ric); } - public synchronized Iterable getRics() { + public synchronized Collection getRics() { return new Vector<>(registeredRics.values()); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java index 2666d606..77be8b31 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java @@ -24,6 +24,7 @@ import java.util.Collection; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.clients.A1ClientFactory; +import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; @@ -58,6 +59,14 @@ public class RicSupervision { private final A1ClientFactory a1ClientFactory; private final Services services; + private static class SynchStartedException extends ServiceException { + private static final long serialVersionUID = 1L; + + public SynchStartedException(String message) { + super(message); + } + } + @Autowired public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Services services) { @@ -80,20 +89,48 @@ public class RicSupervision { private Flux createTask() { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // - .flatMap(this::checkOneRic) // - .onErrorResume(throwable -> Mono.empty()); + .flatMap(this::checkOneRic); } private Mono checkOneRic(RicData ricData) { return checkRicState(ricData) // .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) // + .flatMap(notUsed -> setRicState(ricData)) // .flatMap(x -> checkRicPolicies(ricData)) // - .flatMap(x -> ricData.ric.getLock().unlock()) // - .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) // .flatMap(x -> checkRicPolicyTypes(ricData)) // - .doOnNext(x -> logger.debug("Ric: {} checked OK", ricData.ric.name())) // - .doOnError(t -> logger.debug("Ric: {} check Failed, exception: {}", ricData.ric.name(), t.getMessage())); + .doOnNext(x -> onRicCheckedOk(ricData)) // + .doOnError(t -> onRicCheckedError(t, ricData)) // + .onErrorResume(throwable -> Mono.empty()); + } + + private void onRicCheckedError(Throwable t, RicData ricData) { + logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage()); + if (t instanceof SynchStartedException) { + // this is just a temporary state, + ricData.ric.setState(RicState.AVAILABLE); + } else { + ricData.ric.setState(RicState.UNAVAILABLE); + } + ricData.ric.getLock().unlockBlocking(); + } + + private void onRicCheckedOk(RicData ricData) { + logger.debug("Ric: {} checked OK", ricData.ric.name()); + ricData.ric.setState(RicState.AVAILABLE); + ricData.ric.getLock().unlockBlocking(); + } + + @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields + private Mono setRicState(RicData ric) { + synchronized (ric) { + if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) { + logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name()); + return Mono.empty(); + } + ric.ric.setState(RicState.CONSISTENCY_CHECK); + return Mono.just(ric); + } } private static class RicData { @@ -102,8 +139,12 @@ public class RicSupervision { this.a1Client = a1Client; } + A1Client getClient() { + return a1Client; + } + final Ric ric; - final A1Client a1Client; + private final A1Client a1Client; } private Mono createRicData(Ric ric) { @@ -116,7 +157,7 @@ public class RicSupervision { if (ric.ric.getState() == RicState.UNAVAILABLE) { return startSynchronization(ric) // .onErrorResume(t -> Mono.empty()); - } else if (ric.ric.getState() == RicState.SYNCHRONIZING) { + } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) { return Mono.empty(); } else { return Mono.just(ric); @@ -124,7 +165,7 @@ public class RicSupervision { } private Mono checkRicPolicies(RicData ric) { - return ric.a1Client.getPolicyIdentities() // + return ric.getClient().getPolicyIdentities() // .flatMap(ricP -> validateInstances(ricP, ric)); } @@ -144,7 +185,8 @@ public class RicSupervision { } private Mono checkRicPolicyTypes(RicData ric) { - return ric.a1Client.getPolicyTypeIdentities() // + + return ric.getClient().getPolicyTypeIdentities() // .flatMap(ricTypes -> validateTypes(ricTypes, ric)); } @@ -163,7 +205,7 @@ public class RicSupervision { private Mono startSynchronization(RicData ric) { RicSynchronizationTask synchronizationTask = createSynchronizationTask(); synchronizationTask.run(ric.ric); - return Mono.error(new Exception("Syncronization started")); + return Mono.error(new SynchStartedException("Syncronization started")); } RicSynchronizationTask createSynchronizationTask() { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 4e3fd0fa..6905c70c 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -203,6 +203,12 @@ public class ApplicationTest { rsp = restClient().get(url).block(); assertThat(rsp).contains("ric2"); assertThat(rsp).doesNotContain("ric1"); + assertThat(rsp).contains("AVAILABLE"); + + // All RICs + rsp = restClient().get("/rics").block(); + assertThat(rsp).contains("ric2"); + assertThat(rsp).contains("ric1"); // Non existing policy type url = "/rics?policyType=XXXX"; @@ -211,23 +217,38 @@ public class ApplicationTest { @Test public void testSynchronization() throws Exception { - addRic("ric").setState(Ric.RicState.UNAVAILABLE); - String ricName = "ric"; - Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName); - - getA1Client(ricName).putPolicy(policy2); // put it in the RIC + // Two polictypes will be put in the NearRT RICs + PolicyTypes nearRtRicPolicyTypes = new PolicyTypes(); + nearRtRicPolicyTypes.put(createPolicyType("typeName")); + nearRtRicPolicyTypes.put(createPolicyType("typeName2")); + this.a1ClientFactory.setPolicyTypes(nearRtRicPolicyTypes); + + // One type and one instance added to the agent storage + final String ric1Name = "ric1"; + Ric ric1 = addRic(ric1Name); + Policy policy2 = addPolicy("policyId2", "typeName", "service", ric1Name); + Ric ric2 = addRic("ric2"); + + getA1Client(ric1Name).putPolicy(policy2); // put it in the RIC policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC String policyId = "policyId"; - Policy policy = addPolicy(policyId, "typeName", "service", ricName); // This should be created in the RIC + Policy policy = addPolicy(policyId, "typeName", "service", ric1Name); // This should be created in the RIC supervision.checkAllRics(); // The created policy should be put in the RIC - await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ricName).getState())); - await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ricName).getState())); - Policies ricPolicies = getA1Client(ricName).getPolicies(); + // Wait until synch is completed + await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ric1Name).getState())); + await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ric1Name).getState())); + await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic("ric2").getState())); + + Policies ricPolicies = getA1Client(ric1Name).getPolicies(); assertThat(ricPolicies.size()).isEqualTo(1); Policy ricPolicy = ricPolicies.get(policyId); assertThat(ricPolicy.json()).isEqualTo(policy.json()); + + // Both types should be in the agent storage after the synch + assertThat(ric1.getSupportedPolicyTypes().size()).isEqualTo(2); + assertThat(ric2.getSupportedPolicyTypes().size()).isEqualTo(2); } @Test @@ -695,12 +716,15 @@ public class ApplicationTest { return a1ClientFactory.getOrCreateA1Client(ricName); } - private PolicyType addPolicyType(String policyTypeName, String ricName) { - PolicyType type = ImmutablePolicyType.builder() // + private PolicyType createPolicyType(String policyTypeName) { + return ImmutablePolicyType.builder() // .name(policyTypeName) // .schema("{\"title\":\"" + policyTypeName + "\"}") // .build(); + } + private PolicyType addPolicyType(String policyTypeName, String ricName) { + PolicyType type = createPolicyType(policyTypeName); policyTypes.put(type); addRic(ricName).addSupportedPolicyType(type); return type; diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java index 0a5b27b6..73ca3511 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java @@ -104,7 +104,6 @@ public class RicSupervisionTest { @BeforeEach public void init() { - doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); types.clear(); policies.clear(); rics.clear(); @@ -123,6 +122,7 @@ public class RicSupervisionTest { @Test public void whenRicIdleAndNoChangedPoliciesOrPolicyTypes_thenNoSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.AVAILABLE); RIC_1.addSupportedPolicyType(POLICY_TYPE_1); rics.put(RIC_1); @@ -144,6 +144,7 @@ public class RicSupervisionTest { @Test public void whenRicUndefined_thenSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.UNAVAILABLE); rics.put(RIC_1); @@ -161,6 +162,7 @@ public class RicSupervisionTest { @Test public void whenRicSynchronizing_thenNoSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.SYNCHRONIZING); rics.put(RIC_1); @@ -174,6 +176,7 @@ public class RicSupervisionTest { @Test public void whenRicIdleAndErrorGettingPolicyIdentities_thenNoSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.AVAILABLE); RIC_1.addSupportedPolicyType(POLICY_TYPE_1); rics.put(RIC_1); @@ -185,10 +188,12 @@ public class RicSupervisionTest { verify(supervisorUnderTest).checkAllRics(); verifyNoMoreInteractions(supervisorUnderTest); + assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE); } @Test public void whenRicIdleAndNotSameAmountOfPolicies_thenSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.AVAILABLE); rics.put(RIC_1); @@ -211,6 +216,7 @@ public class RicSupervisionTest { @Test public void whenRicIdleAndSameAmountOfPoliciesButNotSamePolicies_thenSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.AVAILABLE); rics.put(RIC_1); @@ -233,6 +239,7 @@ public class RicSupervisionTest { @Test public void whenRicIdleAndErrorGettingPolicyTypes_thenNoSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.AVAILABLE); RIC_1.addSupportedPolicyType(POLICY_TYPE_1); rics.put(RIC_1); @@ -249,6 +256,7 @@ public class RicSupervisionTest { @Test public void whenRicIdleAndNotSameAmountOfPolicyTypes_thenSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); RIC_1.setState(RicState.AVAILABLE); RIC_1.addSupportedPolicyType(POLICY_TYPE_1); rics.put(RIC_1); @@ -272,6 +280,7 @@ public class RicSupervisionTest { @Test public void whenRicIdleAndSameAmountOfPolicyTypesButNotSameTypes_thenSynchronization() { + doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class)); PolicyType policyType2 = ImmutablePolicyType.builder() // .name("policyType2") // .schema("") // diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java index 99679583..c77259c7 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java @@ -40,7 +40,7 @@ import reactor.core.publisher.Mono; public class MockA1ClientFactory extends A1ClientFactory { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map clients = new HashMap<>(); - private final PolicyTypes policyTypes; + private PolicyTypes policyTypes; private Duration asynchDelay = Duration.ofSeconds(0); public MockA1ClientFactory(PolicyTypes policyTypes) { @@ -62,6 +62,10 @@ public class MockA1ClientFactory extends A1ClientFactory { return clients.get(ricName); } + public void setPolicyTypes(PolicyTypes policyTypes) { + this.policyTypes = policyTypes; + } + /** * Simulate network latency. The REST responses will be generated by separate * threads @@ -77,4 +81,8 @@ public class MockA1ClientFactory extends A1ClientFactory { clients.clear(); } + public PolicyTypes getPolicyTypes() { + return this.policyTypes; + } + } -- 2.16.6