From: PatrikBuhr Date: Fri, 17 Jan 2020 13:49:47 +0000 (+0100) Subject: Changed recovery so that policies will be reconfigured in the RIC after RIC restart X-Git-Tag: 1.0.1~39^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=9a02b07f542f5fbb67b9214253a7706d304e84f8;hp=fb4bc7967a4733d10775351440a3af14327d5f20;p=nonrtric.git Changed recovery so that policies will be reconfigured in the RIC after RIC restart Instead of just removing all policies by a recovery, all policies are recreated in the RIC. Change-Id: Id9bd6f954ee7e156bbf6084e585da7827b89f830 Issue-ID: NONRTRIC-84 Signed-off-by: PatrikBuhr --- diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index 246fdd45..affce2c1 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -145,8 +145,8 @@ public class PolicyController { @RequestParam(name = "instance", required = true) String id) { Policy policy = policies.get(id); if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) { + policies.remove(policy); return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) // - .doOnEach(notUsed -> policies.removeId(id)) // .flatMap(notUsed -> { return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)); }); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java index fb41e260..8b3fadb0 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -20,7 +20,6 @@ package org.oransc.policyagent.tasks; -import java.util.Collection; import java.util.Vector; import org.oransc.policyagent.clients.A1Client; @@ -32,7 +31,6 @@ import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.repository.Service; import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; @@ -42,7 +40,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Loads information about RealTime-RICs at startup. + * Recovery handling of RIC, which means: + * - load all policy types + * - send all policy instances to the RIC + * --- if that fails remove all policy instances + * - Notify subscribing services */ public class RicRecoveryTask { @@ -60,14 +62,6 @@ public class RicRecoveryTask { this.services = services; } - public void run(Rics rics) { - synchronized (rics) { - for (Ric ric : rics.getRics()) { - run(ric); - } - } - } - public void run(Ric ric) { logger.debug("Handling ric: {}", ric.getConfig().name()); @@ -77,16 +71,17 @@ public class RicRecoveryTask { } ric.setState(Ric.RicState.RECOVERING); } - Flux recoveredTypes = recoverPolicyTypes(ric); - Flux deletedPolicies = deletePolicies(ric); + Flux recoverTypes = recoverPolicyTypes(ric); + Flux deletePoliciesInRic = deleteAllPoliciesInRic(ric); + Flux recreatePoliciesInRic = recreateAllPoliciesInRic(ric); - Flux.merge(recoveredTypes, deletedPolicies) // + Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic) // .subscribe(x -> logger.debug("Recover: " + x), // - throwable -> onError(ric, throwable), // - () -> onComplete(ric)); + throwable -> onRecoveryError(ric, throwable), // + () -> onRecoveryComplete(ric)); } - private void onComplete(Ric ric) { + private void onRecoveryComplete(Ric ric) { logger.debug("Recovery completed for:" + ric.name()); ric.setState(Ric.RicState.IDLE); notifyAllServices("Recovery completed for:" + ric.name()); @@ -107,8 +102,22 @@ public class RicRecoveryTask { } } - private void onError(Ric ric, Throwable t) { + private void onRecoveryError(Ric ric, Throwable t) { logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); + + // If recovery fails, try to remove all instances + deleteAllPolicies(ric); + Flux recoverTypes = recoverPolicyTypes(ric); + Flux deletePoliciesInRic = deleteAllPoliciesInRic(ric); + + Flux.merge(recoverTypes, deletePoliciesInRic) // + .subscribe(x -> logger.debug("Brute recover: " + x), // + throwable -> onRemoveAllError(ric, throwable), // + () -> onRecoveryComplete(ric)); + } + + private void onRemoveAllError(Ric ric, Throwable t) { + logger.warn("Remove all failed for: {}, reason: {}", ric.name(), t.getMessage()); ric.setState(Ric.RicState.UNDEFINED); } @@ -143,17 +152,28 @@ public class RicRecoveryTask { return Mono.just(pt); } - private Flux deletePolicies(Ric ric) { + private void deleteAllPolicies(Ric ric) { synchronized (policies) { - Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); - for (Policy policy : ricPolicies) { + for (Policy policy : policies.getForRic(ric.name())) { this.policies.remove(policy); } } + } + private Flux deleteAllPoliciesInRic(Ric ric) { return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); // } + + private Flux recreateAllPoliciesInRic(Ric ric) { + synchronized (policies) { + return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) // + .doOnNext( + policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name())) + .flatMap(policy -> a1Client.putPolicy(policy)); + } + } + } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index d78155d4..d0770172 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -51,6 +51,7 @@ import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.tasks.RepositorySupervision; import org.oransc.policyagent.utils.MockA1Client; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -79,6 +80,12 @@ public class ApplicationTest { @Autowired private PolicyTypes policyTypes; + @Autowired + MockA1Client a1Client; + + @Autowired + RepositorySupervision supervision; + private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // @@ -96,8 +103,6 @@ public class ApplicationTest { */ @TestConfiguration static class TestBeanFactory { - private final Rics rics = new Rics(); - private final Policies policies = new Policies(); private final PolicyTypes policyTypes = new PolicyTypes(); @Bean @@ -112,7 +117,7 @@ public class ApplicationTest { @Bean public Policies getPolicies() { - return this.policies; + return new Policies(); } @Bean @@ -122,7 +127,7 @@ public class ApplicationTest { @Bean public Rics getRics() { - return this.rics; + return new Rics(); } } @@ -152,6 +157,22 @@ public class ApplicationTest { assertThat(rsp).isEqualTo("[]"); } + @Test + public void testRecovery() throws Exception { + reset(); + Policy policy = addPolicy("policyId", "typeName", "service", "ric"); // This should be created in the RIC + + Policy policy2 = addPolicy("policyId2", "typeName", "service", "ric"); + a1Client.putPolicy("ric", policy2); // put it in the RIC + policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC + + supervision.checkAllRics(); // The created policy should be put in the RIC + Policies ricPolicies = a1Client.getPolicies("ric"); + assertThat(ricPolicies.size()).isEqualTo(1); + Policy ricPolicy = ricPolicies.get("policyId"); + assertThat(ricPolicy.json()).isEqualTo(policy.json()); + } + @Test public void testGetRic() throws Exception { reset(); @@ -201,7 +222,7 @@ public class ApplicationTest { Vector mes = new Vector<>(); RicConfig conf = ImmutableRicConfig.builder() // .name(ricName) // - .baseUrl("baseUrl") // + .baseUrl(ricName) // .managedElementIds(mes) // .build(); Ric ric = new Ric(conf); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index ecb4661e..8b488d8b 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -50,13 +50,11 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; public class MockPolicyAgent { static class MockApplicationConfig extends ApplicationConfig { - @Override protected String getLocalConfigurationFilePath() { URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json"); return url.getFile(); } - } /** diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java index b1f397a4..1330a14f 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java @@ -21,6 +21,7 @@ package org.oransc.policyagent.tasks; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -105,6 +106,7 @@ public class RepositorySupervisionTest { when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty()); when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema")); + when(a1ClientMock.putPolicy(any())).thenReturn(Mono.just("OK")); supervisorUnderTest.checkAllRics(); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java index 729fc7b3..e4fd6e68 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java @@ -142,14 +142,9 @@ public class StartupServiceTest { @Test public void startup_unableToConnectToGetTypes() { - Mono> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); Mono error = Mono.error(new Exception("Unable to contact ric.")); - doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString()); - - Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); - doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString()); - when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); - when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); + doReturn(error, error).when(a1ClientMock).getPolicyTypeIdentities(anyString()); + doReturn(error).when(a1ClientMock).getPolicyIdentities(anyString()); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); @@ -159,48 +154,28 @@ public class StartupServiceTest { serviceUnderTest.startup(); serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A), ApplicationConfig.RicConfigUpdate.ADDED); - serviceUnderTest.onRicConfigUpdate( - getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C), - ApplicationConfig.RicConfigUpdate.ADDED); - - verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1); - verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2); assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME); - - assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); } @Test public void startup_unableToConnectToGetPolicies() { - Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); - Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); - when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2); + Mono> policyTypes = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); - Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); - doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock) - .getPolicyIdentities(anyString()); - when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); + Mono error = Mono.error(new Exception("Unable to contact ric.")); + doReturn(error).when(a1ClientMock).getPolicyIdentities(anyString()); Rics rics = new Rics(); - PolicyTypes policyTypes = new PolicyTypes(); StartupService serviceUnderTest = - new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services()); + new StartupService(appConfigMock, rics, new PolicyTypes(), a1ClientMock, new Policies(), new Services()); serviceUnderTest.startup(); serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A), ApplicationConfig.RicConfigUpdate.ADDED); - serviceUnderTest.onRicConfigUpdate( - getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C), - ApplicationConfig.RicConfigUpdate.ADDED); - - verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1); - verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2); assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME); - - assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); } @SafeVarargs diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java index aca29f58..bac27393 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java @@ -86,11 +86,15 @@ public class MockA1Client implements A1Client { return Mono.just("OK"); } - private Policies getPolicies(String url) { + public Policies getPolicies(String url) { if (!policies.containsKey(url)) { policies.put(url, new Policies()); } return policies.get(url); } + public void putPolicy(String url, Policy policy) { + getPolicies(url).put(policy); + } + }