From: PatrikBuhr Date: Wed, 13 May 2020 08:55:14 +0000 (+0200) Subject: Support for transient policies not recreated at synchronization X-Git-Tag: 2.0.0~45 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=4db5e7d262aaa8ccf18feaa4bd93a6a925801333;hp=083393d0affc7dca6a5cea89f4f9759801a91591;p=nonrtric.git Support for transient policies not recreated at synchronization Decreased the sleep time when waiting for dmaap configuration to 10 seconds instead on 60. Change-Id: I774c62d665efe84d249c486094de41168233c410 Issue-ID: NONRTRIC-217 Signed-off-by: PatrikBuhr --- diff --git a/policy-agent/docs/api.yaml b/policy-agent/docs/api.yaml index f42c3e00..f3dd06c6 100644 --- a/policy-agent/docs/api.yaml +++ b/policy-agent/docs/api.yaml @@ -112,6 +112,12 @@ paths: description: service required: true type: string + - name: transient + in: query + description: transient + required: false + type: boolean + default: false - name: type in: query description: type diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index 6d3f5945..49d77028 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -209,6 +209,7 @@ public class PolicyController { @RequestParam(name = "id", required = true) String instanceId, // @RequestParam(name = "ric", required = true) String ricName, // @RequestParam(name = "service", required = true) String service, // + @RequestParam(name = "transient", required = false, defaultValue = "false") boolean isTransient, // @RequestBody Object jsonBody) { String jsonString = gson.toJson(jsonBody); @@ -225,6 +226,7 @@ public class PolicyController { .ric(ric) // .ownerServiceName(service) // .lastModified(getTimeStampUtc()) // + .isTransient(isTransient) // .build(); final boolean isCreate = this.policies.get(policy.id()) == null; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index 9165af54..f13ffebd 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -33,7 +33,6 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; -import org.oransc.policyagent.tasks.RefreshConfigTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -74,36 +73,30 @@ public class DmaapMessageConsumer { } /** - * Starts the consumer. If there is a DMaaP configuration, it will start polling for messages. Otherwise it will - * check regularly for the configuration. + * Starts the consumer. If there is a DMaaP configuration, it will start polling + * for messages. Otherwise it will check regularly for the configuration. * * @return the running thread, for test purposes. */ public Thread start() { - Thread thread = new Thread(this::checkConfigLoop); + Thread thread = new Thread(this::messageHandlingLoop); thread.start(); return thread; } - private void checkConfigLoop() { - while (!isStopped()) { - if (isDmaapConfigured()) { - messageHandlingLoop(); - } else { - sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); - } - } - } - private void messageHandlingLoop() { - while (!isStopped() && isDmaapConfigured()) { + while (!isStopped()) { try { - Iterable dmaapMsgs = fetchAllMessages(); - if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { - logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); - for (String msg : dmaapMsgs) { - processMsg(msg); + if (isDmaapConfigured()) { + Iterable dmaapMsgs = fetchAllMessages(); + if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); + for (String msg : dmaapMsgs) { + processMsg(msg); + } } + } else { + sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration } } catch (Exception e) { logger.warn("Cannot fetch because of {}", e.getMessage()); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policy.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policy.java index 51482262..e96d2506 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policy.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policy.java @@ -37,4 +37,6 @@ public interface Policy { public PolicyType type(); public String lastModified(); + + public boolean isTransient(); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 12bc55a7..89c8d638 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -83,7 +83,7 @@ public class RefreshConfigTask { /** * The time between refreshes of the configuration. */ - public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); + static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1); final ApplicationConfig appConfig; @Getter(AccessLevel.PROTECTED) diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index 42d9ab6e..5d782075 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -198,8 +198,16 @@ public class RicSynchronizationTask { .flatMapMany(notUsed -> Flux.just(policy)); } + private boolean checkTransient(Policy policy) { + if (policy.isTransient()) { + this.policies.remove(policy); + } + return policy.isTransient(); + } + private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.name())) // + .filter(policy -> !checkTransient(policy)) // .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index a107bdfc..0fd4a335 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -274,13 +274,23 @@ public class ApplicationTest { testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND); } - private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) { + private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId, + boolean isTransient) { + String url; if (policyTypeName.isEmpty()) { - return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName; + url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName; } else { - return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type=" + url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type=" + policyTypeName; } + if (isTransient) { + url += "&transient=true"; + } + return url; + } + + private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) { + return putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, false); } @Test @@ -293,7 +303,8 @@ public class ApplicationTest { putService(serviceName); addPolicyType(policyTypeName, ricName); - String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId); + // PUT a transient policy + String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, true); final String policyBody = jsonString(); this.rics.getRic(ricName).setState(Ric.RicState.AVAILABLE); @@ -304,6 +315,13 @@ public class ApplicationTest { assertThat(policy.id()).isEqualTo(policyInstanceId); assertThat(policy.ownerServiceName()).isEqualTo(serviceName); assertThat(policy.ric().name()).isEqualTo("ric1"); + assertThat(policy.isTransient()).isEqualTo(true); + + // Put a non transient policy + url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId); + restClient().put(url, policyBody).block(); + policy = policies.getPolicy(policyInstanceId); + assertThat(policy.isTransient()).isEqualTo(false); url = "/policies"; String rsp = restClient().get(url).block(); @@ -632,12 +650,15 @@ public class ApplicationTest { private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException { addRic(ric); - Policy p = ImmutablePolicy.builder().id(id) // + Policy p = ImmutablePolicy.builder() // + .id(id) // .json(jsonString()) // .ownerServiceName(service) // .ric(rics.getRic(ric)) // .type(addPolicyType(typeName, ric)) // - .lastModified("lastModified").build(); + .lastModified("lastModified") // + .isTransient(false) // + .build(); policies.put(p); return p; } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java index 4a707dd2..2d57e52a 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java @@ -107,6 +107,7 @@ class ConcurrencyTestRunnable implements Runnable { .ric(ric) // .ownerServiceName("") // .lastModified("") // + .isTransient(false) // .build(); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index d8a97184..16364181 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -183,6 +183,7 @@ public class MockPolicyAgent { .ric(ric) // .type(unnamedPolicyType) // .lastModified("now") // + .isTransient(false) // .build(); this.policies.put(policy); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java b/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java index 79c8b11d..722fea7d 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java @@ -63,6 +63,7 @@ public class A1ClientHelper { .ric(createRic(nearRtRicUrl)) // .type(createPolicyType(type)) // .lastModified("now") // + .isTransient(false) // .build(); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index b9696ade..7b338ccc 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -50,7 +50,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.tasks.RefreshConfigTask; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; @@ -82,7 +81,7 @@ public class DmaapMessageConsumerTest { messageConsumerUnderTest.start().join(); InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); } @@ -99,7 +98,7 @@ public class DmaapMessageConsumerTest { InOrder orderVerifier = inOrder(messageConsumerUnderTest); orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); - orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL); + orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test @@ -112,7 +111,7 @@ public class DmaapMessageConsumerTest { response.setResponseCode(Integer.toString(HttpStatus.OK.value())); response.setActualMessages(Collections.emptyList()); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); @@ -130,7 +129,7 @@ public class DmaapMessageConsumerTest { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); @@ -158,7 +157,7 @@ public class DmaapMessageConsumerTest { setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) .getMessageRouterConsumer(any(Properties.class)); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java index cb911330..6edd4e3a 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java @@ -310,6 +310,7 @@ public class RefreshConfigTaskTest { .ric(ric) // .json("{}") // .ownerServiceName("ownerServiceName") // + .isTransient(false) // .build(); return policy; } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java index 73ca3511..a42142ba 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java @@ -78,6 +78,7 @@ public class RicSupervisionTest { .ric(RIC_1) // .type(POLICY_TYPE_1) // .lastModified("now") // + .isTransient(false) // .build(); private static final Policy POLICY_2 = ImmutablePolicy.builder() // @@ -87,6 +88,7 @@ public class RicSupervisionTest { .ric(RIC_1) // .type(POLICY_TYPE_1) // .lastModified("now") // + .isTransient(false) // .build(); @Mock diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java index f6d2adeb..c8ebe27e 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java @@ -80,14 +80,19 @@ public class RicSynchronizationTaskTest { .controllerName("controllerName") // .build()); - private static final Policy POLICY_1 = ImmutablePolicy.builder() // - .id("policyId1") // - .json("") // - .ownerServiceName("service") // - .ric(RIC_1) // - .type(POLICY_TYPE_1) // - .lastModified("now") // - .build(); + private static Policy createPolicy(boolean isTransient) { + return ImmutablePolicy.builder() // + .id("policyId1") // + .json("") // + .ownerServiceName("service") // + .ric(RIC_1) // + .type(POLICY_TYPE_1) // + .lastModified("now") // + .isTransient(isTransient) // + .build(); + } + + private static final Policy POLICY_1 = createPolicy(false); private static final String SERVICE_1_NAME = "service1"; private static final String SERVICE_1_CALLBACK_URL = "callbackUrl"; @@ -196,6 +201,9 @@ public class RicSynchronizationTaskTest { public void ricIdleAndHavePolicies_thenSynchronizationWithRecreationOfPolicies() { RIC_1.setState(RicState.AVAILABLE); + Policy transientPolicy = createPolicy(true); + + policies.put(transientPolicy); policies.put(POLICY_1); setUpCreationOfA1Client(); @@ -214,7 +222,7 @@ public class RicSynchronizationTaskTest { verifyNoMoreInteractions(a1ClientMock); assertThat(policyTypes.size()).isEqualTo(0); - assertThat(policies.size()).isEqualTo(1); + assertThat(policies.size()).isEqualTo(1); // The transient policy shall be deleted assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java index f26083a5..90a35810 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java @@ -88,6 +88,7 @@ public class ServiceSupervisionTest { .ric(ric) // .type(policyType) // .lastModified("lastModified") // + .isTransient(false) // .build(); @Test