From fd0947fdae1ce5e195b4ef3926581acc1b75613d Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Thu, 7 May 2020 12:46:52 +0200 Subject: [PATCH] Decrease concurrency Decrease concurrency when policies are recreated. Bugfix, SdncOscA1Client deleteAllPolicies did nohing. Change-Id: I9db04fd951defb1b033f96c115da18e7bd05ff04 Issue-ID: NONRTRIC-201 Signed-off-by: PatrikBuhr --- .../oransc/policyagent/clients/OscA1Client.java | 5 ++-- .../policyagent/clients/SdncOscA1Client.java | 19 +++++++++++--- .../oransc/policyagent/tasks/RicSupervision.java | 30 ++++++++++------------ .../policyagent/tasks/RicSynchronizationTask.java | 5 ++-- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java index 90fbd101..99e5bae4 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java @@ -37,6 +37,7 @@ import reactor.core.publisher.Mono; */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class OscA1Client implements A1Client { + static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC public static class UriBuilder implements A1UriBuilder { private final RicConfig ricConfig; @@ -179,7 +180,7 @@ public class OscA1Client implements A1Client { @Override public Flux deleteAllPolicies() { return getPolicyTypeIds() // - .flatMap(this::deletePoliciesForType); + .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC); } @Override @@ -206,6 +207,6 @@ public class OscA1Client implements A1Client { private Flux deletePoliciesForType(String typeId) { return getPolicyIdentitiesByType(typeId) // - .flatMap(policyId -> deletePolicyById(typeId, policyId)); + .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java index fcb3236a..2daa4d68 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java @@ -48,6 +48,8 @@ import reactor.core.publisher.Mono; @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class SdncOscA1Client implements A1Client { + static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC + @Value.Immutable @org.immutables.gson.Gson.TypeAdapters public interface AdapterRequest { @@ -157,18 +159,27 @@ public class SdncOscA1Client implements A1Client { public Flux deleteAllPolicies() { if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) { return getPolicyIds() // - .flatMap(policyId -> deletePolicyById("", policyId)); // + .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); // } else if (this.protocolType == A1ProtocolType.SDNC_OSC_OSC_V1) { OscA1Client.UriBuilder uriBuilder = new OscA1Client.UriBuilder(ricConfig); return getPolicyTypeIdentities() // - .flatMapMany(Flux::fromIterable) - .flatMap(type -> post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty())) // - .flatMap(SdncJsonHelper::parseJsonArrayOfString); + .flatMapMany(Flux::fromIterable) // + .flatMap(type -> oscDeleteInstancesForType(uriBuilder, type), CONCURRENCY_RIC); } else { return Flux.error(createIllegalProtocolException()); } } + private Flux oscGetInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) { + return post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty()) // + .flatMapMany(SdncJsonHelper::parseJsonArrayOfString); + } + + private Flux oscDeleteInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) { + return oscGetInstancesForType(uriBuilder, type) // + .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC); + } + @Override public Mono getProtocolVersion() { return tryStdProtocolVersion() // diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java index 77be8b31..228038af 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java @@ -67,6 +67,20 @@ public class RicSupervision { } } + private static class RicData { + RicData(Ric ric, A1Client a1Client) { + this.ric = ric; + this.a1Client = a1Client; + } + + A1Client getClient() { + return a1Client; + } + + final Ric ric; + private final A1Client a1Client; + } + @Autowired public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Services services) { @@ -90,7 +104,6 @@ public class RicSupervision { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // .flatMap(this::checkOneRic); - } private Mono checkOneRic(RicData ricData) { @@ -133,20 +146,6 @@ public class RicSupervision { } } - private static class RicData { - RicData(Ric ric, A1Client a1Client) { - this.ric = ric; - this.a1Client = a1Client; - } - - A1Client getClient() { - return a1Client; - } - - final Ric ric; - private final A1Client a1Client; - } - private Mono createRicData(Ric ric) { return Mono.just(ric) // .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // @@ -185,7 +184,6 @@ public class RicSupervision { } private Mono checkRicPolicyTypes(RicData ric) { - return ric.getClient().getPolicyTypeIdentities() // .flatMap(ricTypes -> validateTypes(ricTypes, ric)); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index ae91d4b2..42d9ab6e 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -58,6 +58,7 @@ import reactor.core.publisher.SignalType; public class RicSynchronizationTask { private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class); + static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC private final A1ClientFactory a1ClientFactory; private final PolicyTypes policyTypes; @@ -167,7 +168,7 @@ public class RicSynchronizationTask { .doOnNext(x -> ric.clearSupportedPolicyTypes()) // .flatMapMany(Flux::fromIterable) // .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // - .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) // + .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // .doOnNext(ric::addSupportedPolicyType); // } @@ -199,7 +200,7 @@ public class RicSynchronizationTask { private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.name())) // - .flatMap(policy -> putPolicy(policy, ric, a1Client)); + .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); } } -- 2.16.6