Support for transient policies not recreated at synchronization 79/3679/3
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 13 May 2020 08:55:14 +0000 (10:55 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 13 May 2020 09:44:59 +0000 (11:44 +0200)
Decreased the sleep time when waiting for dmaap configuration to 10 seconds instead on 60.

Change-Id: I774c62d665efe84d249c486094de41168233c410
Issue-ID: NONRTRIC-217
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
15 files changed:
policy-agent/docs/api.yaml
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policy.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java

index f42c3e0..f3dd06c 100644 (file)
@@ -112,6 +112,12 @@ paths:
           description: service
           required: true
           type: string
+        - name: transient
+          in: query
+          description: transient
+          required: false
+          type: boolean
+          default: false
         - name: type
           in: query
           description: type
index 6d3f594..49d7702 100644 (file)
@@ -209,6 +209,7 @@ public class PolicyController {
         @RequestParam(name = "id", required = true) String instanceId, //
         @RequestParam(name = "ric", required = true) String ricName, //
         @RequestParam(name = "service", required = true) String service, //
+        @RequestParam(name = "transient", required = false, defaultValue = "false") boolean isTransient, //
         @RequestBody Object jsonBody) {
 
         String jsonString = gson.toJson(jsonBody);
@@ -225,6 +226,7 @@ public class PolicyController {
             .ric(ric) //
             .ownerServiceName(service) //
             .lastModified(getTimeStampUtc()) //
+            .isTransient(isTransient) //
             .build();
 
         final boolean isCreate = this.policies.get(policy.id()) == null;
index 9165af5..f13ffeb 100644 (file)
@@ -33,7 +33,6 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -74,36 +73,30 @@ public class DmaapMessageConsumer {
     }
 
     /**
-     * Starts the consumer. If there is a DMaaP configuration, it will start polling for messages. Otherwise it will
-     * check regularly for the configuration.
+     * Starts the consumer. If there is a DMaaP configuration, it will start polling
+     * for messages. Otherwise it will check regularly for the configuration.
      *
      * @return the running thread, for test purposes.
      */
     public Thread start() {
-        Thread thread = new Thread(this::checkConfigLoop);
+        Thread thread = new Thread(this::messageHandlingLoop);
         thread.start();
         return thread;
     }
 
-    private void checkConfigLoop() {
-        while (!isStopped()) {
-            if (isDmaapConfigured()) {
-                messageHandlingLoop();
-            } else {
-                sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
-            }
-        }
-    }
-
     private void messageHandlingLoop() {
-        while (!isStopped() && isDmaapConfigured()) {
+        while (!isStopped()) {
             try {
-                Iterable<String> dmaapMsgs = fetchAllMessages();
-                if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
-                    logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
-                    for (String msg : dmaapMsgs) {
-                        processMsg(msg);
+                if (isDmaapConfigured()) {
+                    Iterable<String> dmaapMsgs = fetchAllMessages();
+                    if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
+                        logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+                        for (String msg : dmaapMsgs) {
+                            processMsg(msg);
+                        }
                     }
+                } else {
+                    sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
                 }
             } catch (Exception e) {
                 logger.warn("Cannot fetch because of {}", e.getMessage());
index 5148226..e96d250 100644 (file)
@@ -37,4 +37,6 @@ public interface Policy {
     public PolicyType type();
 
     public String lastModified();
+
+    public boolean isTransient();
 }
index 12bc55a..89c8d63 100644 (file)
@@ -83,7 +83,7 @@ public class RefreshConfigTask {
     /**
      * The time between refreshes of the configuration.
      */
-    public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+    static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     final ApplicationConfig appConfig;
     @Getter(AccessLevel.PROTECTED)
index 42d9ab6..5d78207 100644 (file)
@@ -198,8 +198,16 @@ public class RicSynchronizationTask {
             .flatMapMany(notUsed -> Flux.just(policy));
     }
 
+    private boolean checkTransient(Policy policy) {
+        if (policy.isTransient()) {
+            this.policies.remove(policy);
+        }
+        return policy.isTransient();
+    }
+
     private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
         return Flux.fromIterable(policies.getForRic(ric.name())) //
+            .filter(policy -> !checkTransient(policy)) //
             .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
     }
 
index a107bdf..0fd4a33 100644 (file)
@@ -274,13 +274,23 @@ public class ApplicationTest {
         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND);
     }
 
-    private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) {
+    private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId,
+        boolean isTransient) {
+        String url;
         if (policyTypeName.isEmpty()) {
-            return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName;
+            url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName;
         } else {
-            return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type="
+            url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type="
                 + policyTypeName;
         }
+        if (isTransient) {
+            url += "&transient=true";
+        }
+        return url;
+    }
+
+    private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) {
+        return putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, false);
     }
 
     @Test
@@ -293,7 +303,8 @@ public class ApplicationTest {
         putService(serviceName);
         addPolicyType(policyTypeName, ricName);
 
-        String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
+        // PUT a transient policy
+        String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, true);
         final String policyBody = jsonString();
         this.rics.getRic(ricName).setState(Ric.RicState.AVAILABLE);
 
@@ -304,6 +315,13 @@ public class ApplicationTest {
         assertThat(policy.id()).isEqualTo(policyInstanceId);
         assertThat(policy.ownerServiceName()).isEqualTo(serviceName);
         assertThat(policy.ric().name()).isEqualTo("ric1");
+        assertThat(policy.isTransient()).isEqualTo(true);
+
+        // Put a non transient policy
+        url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
+        restClient().put(url, policyBody).block();
+        policy = policies.getPolicy(policyInstanceId);
+        assertThat(policy.isTransient()).isEqualTo(false);
 
         url = "/policies";
         String rsp = restClient().get(url).block();
@@ -632,12 +650,15 @@ public class ApplicationTest {
 
     private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
         addRic(ric);
-        Policy p = ImmutablePolicy.builder().id(id) //
+        Policy p = ImmutablePolicy.builder() //
+            .id(id) //
             .json(jsonString()) //
             .ownerServiceName(service) //
             .ric(rics.getRic(ric)) //
             .type(addPolicyType(typeName, ric)) //
-            .lastModified("lastModified").build();
+            .lastModified("lastModified") //
+            .isTransient(false) //
+            .build();
         policies.put(p);
         return p;
     }
index 4a707dd..2d57e52 100644 (file)
@@ -107,6 +107,7 @@ class ConcurrencyTestRunnable implements Runnable {
             .ric(ric) //
             .ownerServiceName("") //
             .lastModified("") //
+            .isTransient(false) //
             .build();
     }
 
index d8a9718..1636418 100644 (file)
@@ -183,6 +183,7 @@ public class MockPolicyAgent {
             .ric(ric) //
             .type(unnamedPolicyType) //
             .lastModified("now") //
+            .isTransient(false) //
             .build();
         this.policies.put(policy);
     }
index 79c8b11..722fea7 100644 (file)
@@ -63,6 +63,7 @@ public class A1ClientHelper {
             .ric(createRic(nearRtRicUrl)) //
             .type(createPolicyType(type)) //
             .lastModified("now") //
+            .isTransient(false) //
             .build();
     }
 
index b9696ad..7b338cc 100644 (file)
@@ -50,7 +50,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.oransc.policyagent.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
 
@@ -82,7 +81,7 @@ public class DmaapMessageConsumerTest {
         messageConsumerUnderTest.start().join();
 
         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
-        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
     }
 
@@ -99,7 +98,7 @@ public class DmaapMessageConsumerTest {
 
         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
-        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
     }
 
     @Test
@@ -112,7 +111,7 @@ public class DmaapMessageConsumerTest {
         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
         response.setActualMessages(Collections.emptyList());
 
-        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
@@ -130,7 +129,7 @@ public class DmaapMessageConsumerTest {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
-        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
 
@@ -158,7 +157,7 @@ public class DmaapMessageConsumerTest {
         setUpMrConfig();
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
 
index cb91133..6edd4e3 100644 (file)
@@ -310,6 +310,7 @@ public class RefreshConfigTaskTest {
             .ric(ric) //
             .json("{}") //
             .ownerServiceName("ownerServiceName") //
+            .isTransient(false) //
             .build();
         return policy;
     }
index 73ca351..a42142b 100644 (file)
@@ -78,6 +78,7 @@ public class RicSupervisionTest {
         .ric(RIC_1) //
         .type(POLICY_TYPE_1) //
         .lastModified("now") //
+        .isTransient(false) //
         .build();
 
     private static final Policy POLICY_2 = ImmutablePolicy.builder() //
@@ -87,6 +88,7 @@ public class RicSupervisionTest {
         .ric(RIC_1) //
         .type(POLICY_TYPE_1) //
         .lastModified("now") //
+        .isTransient(false) //
         .build();
 
     @Mock
index f6d2ade..c8ebe27 100644 (file)
@@ -80,14 +80,19 @@ public class RicSynchronizationTaskTest {
         .controllerName("controllerName") //
         .build());
 
-    private static final Policy POLICY_1 = ImmutablePolicy.builder() //
-        .id("policyId1") //
-        .json("") //
-        .ownerServiceName("service") //
-        .ric(RIC_1) //
-        .type(POLICY_TYPE_1) //
-        .lastModified("now") //
-        .build();
+    private static Policy createPolicy(boolean isTransient) {
+        return ImmutablePolicy.builder() //
+            .id("policyId1") //
+            .json("") //
+            .ownerServiceName("service") //
+            .ric(RIC_1) //
+            .type(POLICY_TYPE_1) //
+            .lastModified("now") //
+            .isTransient(isTransient) //
+            .build();
+    }
+
+    private static final Policy POLICY_1 = createPolicy(false);
 
     private static final String SERVICE_1_NAME = "service1";
     private static final String SERVICE_1_CALLBACK_URL = "callbackUrl";
@@ -196,6 +201,9 @@ public class RicSynchronizationTaskTest {
     public void ricIdleAndHavePolicies_thenSynchronizationWithRecreationOfPolicies() {
         RIC_1.setState(RicState.AVAILABLE);
 
+        Policy transientPolicy = createPolicy(true);
+
+        policies.put(transientPolicy);
         policies.put(POLICY_1);
 
         setUpCreationOfA1Client();
@@ -214,7 +222,7 @@ public class RicSynchronizationTaskTest {
         verifyNoMoreInteractions(a1ClientMock);
 
         assertThat(policyTypes.size()).isEqualTo(0);
-        assertThat(policies.size()).isEqualTo(1);
+        assertThat(policies.size()).isEqualTo(1); // The transient policy shall be deleted
         assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
     }
 
index f26083a..90a3581 100644 (file)
@@ -88,6 +88,7 @@ public class ServiceSupervisionTest {
         .ric(ric) //
         .type(policyType) //
         .lastModified("lastModified") //
+        .isTransient(false) //
         .build();
 
     @Test