Decrease concurrency when policies are recreated.
Bugfix, SdncOscA1Client deleteAllPolicies did nohing.
Change-Id: I9db04fd951defb1b033f96c115da18e7bd05ff04
Issue-ID: NONRTRIC-201
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class OscA1Client implements A1Client {
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
public static class UriBuilder implements A1UriBuilder {
private final RicConfig ricConfig;
@Override
public Flux<String> deleteAllPolicies() {
return getPolicyTypeIds() //
- .flatMap(this::deletePoliciesForType);
+ .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
}
@Override
private Flux<String> deletePoliciesForType(String typeId) {
return getPolicyIdentitiesByType(typeId) //
- .flatMap(policyId -> deletePolicyById(typeId, policyId));
+ .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
}
}
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class SdncOscA1Client implements A1Client {
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
+
@Value.Immutable
@org.immutables.gson.Gson.TypeAdapters
public interface AdapterRequest {
public Flux<String> deleteAllPolicies() {
if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
return getPolicyIds() //
- .flatMap(policyId -> deletePolicyById("", policyId)); //
+ .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
} else if (this.protocolType == A1ProtocolType.SDNC_OSC_OSC_V1) {
OscA1Client.UriBuilder uriBuilder = new OscA1Client.UriBuilder(ricConfig);
return getPolicyTypeIdentities() //
- .flatMapMany(Flux::fromIterable)
- .flatMap(type -> post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty())) //
- .flatMap(SdncJsonHelper::parseJsonArrayOfString);
+ .flatMapMany(Flux::fromIterable) //
+ .flatMap(type -> oscDeleteInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
} else {
return Flux.error(createIllegalProtocolException());
}
}
+ private Flux<String> oscGetInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+ return post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Flux<String> oscDeleteInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+ return oscGetInstancesForType(uriBuilder, type) //
+ .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+ }
+
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
return tryStdProtocolVersion() //
}
}
+ private static class RicData {
+ RicData(Ric ric, A1Client a1Client) {
+ this.ric = ric;
+ this.a1Client = a1Client;
+ }
+
+ A1Client getClient() {
+ return a1Client;
+ }
+
+ final Ric ric;
+ private final A1Client a1Client;
+ }
+
@Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.flatMap(this::checkOneRic);
-
}
private Mono<RicData> checkOneRic(RicData ricData) {
}
}
- private static class RicData {
- RicData(Ric ric, A1Client a1Client) {
- this.ric = ric;
- this.a1Client = a1Client;
- }
-
- A1Client getClient() {
- return a1Client;
- }
-
- final Ric ric;
- private final A1Client a1Client;
- }
-
private Mono<RicData> createRicData(Ric ric) {
return Mono.just(ric) //
.flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
-
return ric.getClient().getPolicyTypeIdentities() //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
public class RicSynchronizationTask {
private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
private final A1ClientFactory a1ClientFactory;
private final PolicyTypes policyTypes;
.doOnNext(x -> ric.clearSupportedPolicyTypes()) //
.flatMapMany(Flux::fromIterable) //
.doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
- .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
.doOnNext(ric::addSupportedPolicyType); //
}
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.name())) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client));
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
}
}