Decrease concurrency 33/3633/2
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 7 May 2020 10:46:52 +0000 (12:46 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 8 May 2020 11:08:29 +0000 (13:08 +0200)
Decrease concurrency when policies are recreated.

Bugfix, SdncOscA1Client deleteAllPolicies did nohing.

Change-Id: I9db04fd951defb1b033f96c115da18e7bd05ff04
Issue-ID: NONRTRIC-201
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java

index 90fbd10..99e5bae 100644 (file)
@@ -37,6 +37,7 @@ import reactor.core.publisher.Mono;
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class OscA1Client implements A1Client {
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
 
     public static class UriBuilder implements A1UriBuilder {
         private final RicConfig ricConfig;
@@ -179,7 +180,7 @@ public class OscA1Client implements A1Client {
     @Override
     public Flux<String> deleteAllPolicies() {
         return getPolicyTypeIds() //
-            .flatMap(this::deletePoliciesForType);
+            .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
     }
 
     @Override
@@ -206,6 +207,6 @@ public class OscA1Client implements A1Client {
 
     private Flux<String> deletePoliciesForType(String typeId) {
         return getPolicyIdentitiesByType(typeId) //
-            .flatMap(policyId -> deletePolicyById(typeId, policyId));
+            .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
     }
 }
index fcb3236..2daa4d6 100644 (file)
@@ -48,6 +48,8 @@ import reactor.core.publisher.Mono;
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class SdncOscA1Client implements A1Client {
 
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
+
     @Value.Immutable
     @org.immutables.gson.Gson.TypeAdapters
     public interface AdapterRequest {
@@ -157,18 +159,27 @@ public class SdncOscA1Client implements A1Client {
     public Flux<String> deleteAllPolicies() {
         if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
             return getPolicyIds() //
-                .flatMap(policyId -> deletePolicyById("", policyId)); //
+                .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
         } else if (this.protocolType == A1ProtocolType.SDNC_OSC_OSC_V1) {
             OscA1Client.UriBuilder uriBuilder = new OscA1Client.UriBuilder(ricConfig);
             return getPolicyTypeIdentities() //
-                .flatMapMany(Flux::fromIterable)
-                .flatMap(type -> post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty())) //
-                .flatMap(SdncJsonHelper::parseJsonArrayOfString);
+                .flatMapMany(Flux::fromIterable) //
+                .flatMap(type -> oscDeleteInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
         } else {
             return Flux.error(createIllegalProtocolException());
         }
     }
 
+    private Flux<String> oscGetInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+        return post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty()) //
+            .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+    }
+
+    private Flux<String> oscDeleteInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+        return oscGetInstancesForType(uriBuilder, type) //
+            .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+    }
+
     @Override
     public Mono<A1ProtocolType> getProtocolVersion() {
         return tryStdProtocolVersion() //
index 77be8b3..228038a 100644 (file)
@@ -67,6 +67,20 @@ public class RicSupervision {
         }
     }
 
+    private static class RicData {
+        RicData(Ric ric, A1Client a1Client) {
+            this.ric = ric;
+            this.a1Client = a1Client;
+        }
+
+        A1Client getClient() {
+            return a1Client;
+        }
+
+        final Ric ric;
+        private final A1Client a1Client;
+    }
+
     @Autowired
     public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
         Services services) {
@@ -90,7 +104,6 @@ public class RicSupervision {
         return Flux.fromIterable(rics.getRics()) //
             .flatMap(this::createRicData) //
             .flatMap(this::checkOneRic);
-
     }
 
     private Mono<RicData> checkOneRic(RicData ricData) {
@@ -133,20 +146,6 @@ public class RicSupervision {
         }
     }
 
-    private static class RicData {
-        RicData(Ric ric, A1Client a1Client) {
-            this.ric = ric;
-            this.a1Client = a1Client;
-        }
-
-        A1Client getClient() {
-            return a1Client;
-        }
-
-        final Ric ric;
-        private final A1Client a1Client;
-    }
-
     private Mono<RicData> createRicData(Ric ric) {
         return Mono.just(ric) //
             .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
@@ -185,7 +184,6 @@ public class RicSupervision {
     }
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
-
         return ric.getClient().getPolicyTypeIdentities() //
             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
     }
index ae91d4b..42d9ab6 100644 (file)
@@ -58,6 +58,7 @@ import reactor.core.publisher.SignalType;
 public class RicSynchronizationTask {
 
     private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
 
     private final A1ClientFactory a1ClientFactory;
     private final PolicyTypes policyTypes;
@@ -167,7 +168,7 @@ public class RicSynchronizationTask {
             .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
             .flatMapMany(Flux::fromIterable) //
             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
-            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
+            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
             .doOnNext(ric::addSupportedPolicyType); //
     }
 
@@ -199,7 +200,7 @@ public class RicSynchronizationTask {
 
     private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
         return Flux.fromIterable(policies.getForRic(ric.name())) //
-            .flatMap(policy -> putPolicy(policy, ric, a1Client));
+            .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
     }
 
 }