summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
341ee94)
Decrease concurrency when policies are recreated.
Bugfix, SdncOscA1Client deleteAllPolicies did nohing.
Change-Id: I9db04fd951defb1b033f96c115da18e7bd05ff04
Issue-ID: NONRTRIC-201
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class OscA1Client implements A1Client {
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class OscA1Client implements A1Client {
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
public static class UriBuilder implements A1UriBuilder {
private final RicConfig ricConfig;
public static class UriBuilder implements A1UriBuilder {
private final RicConfig ricConfig;
@Override
public Flux<String> deleteAllPolicies() {
return getPolicyTypeIds() //
@Override
public Flux<String> deleteAllPolicies() {
return getPolicyTypeIds() //
- .flatMap(this::deletePoliciesForType);
+ .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
private Flux<String> deletePoliciesForType(String typeId) {
return getPolicyIdentitiesByType(typeId) //
private Flux<String> deletePoliciesForType(String typeId) {
return getPolicyIdentitiesByType(typeId) //
- .flatMap(policyId -> deletePolicyById(typeId, policyId));
+ .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class SdncOscA1Client implements A1Client {
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class SdncOscA1Client implements A1Client {
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
+
@Value.Immutable
@org.immutables.gson.Gson.TypeAdapters
public interface AdapterRequest {
@Value.Immutable
@org.immutables.gson.Gson.TypeAdapters
public interface AdapterRequest {
public Flux<String> deleteAllPolicies() {
if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
return getPolicyIds() //
public Flux<String> deleteAllPolicies() {
if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
return getPolicyIds() //
- .flatMap(policyId -> deletePolicyById("", policyId)); //
+ .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
} else if (this.protocolType == A1ProtocolType.SDNC_OSC_OSC_V1) {
OscA1Client.UriBuilder uriBuilder = new OscA1Client.UriBuilder(ricConfig);
return getPolicyTypeIdentities() //
} else if (this.protocolType == A1ProtocolType.SDNC_OSC_OSC_V1) {
OscA1Client.UriBuilder uriBuilder = new OscA1Client.UriBuilder(ricConfig);
return getPolicyTypeIdentities() //
- .flatMapMany(Flux::fromIterable)
- .flatMap(type -> post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty())) //
- .flatMap(SdncJsonHelper::parseJsonArrayOfString);
+ .flatMapMany(Flux::fromIterable) //
+ .flatMap(type -> oscDeleteInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
} else {
return Flux.error(createIllegalProtocolException());
}
}
} else {
return Flux.error(createIllegalProtocolException());
}
}
+ private Flux<String> oscGetInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+ return post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Flux<String> oscDeleteInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+ return oscGetInstancesForType(uriBuilder, type) //
+ .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+ }
+
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
return tryStdProtocolVersion() //
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
return tryStdProtocolVersion() //
+ private static class RicData {
+ RicData(Ric ric, A1Client a1Client) {
+ this.ric = ric;
+ this.a1Client = a1Client;
+ }
+
+ A1Client getClient() {
+ return a1Client;
+ }
+
+ final Ric ric;
+ private final A1Client a1Client;
+ }
+
@Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
@Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.flatMap(this::checkOneRic);
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.flatMap(this::checkOneRic);
}
private Mono<RicData> checkOneRic(RicData ricData) {
}
private Mono<RicData> checkOneRic(RicData ricData) {
- private static class RicData {
- RicData(Ric ric, A1Client a1Client) {
- this.ric = ric;
- this.a1Client = a1Client;
- }
-
- A1Client getClient() {
- return a1Client;
- }
-
- final Ric ric;
- private final A1Client a1Client;
- }
-
private Mono<RicData> createRicData(Ric ric) {
return Mono.just(ric) //
.flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
private Mono<RicData> createRicData(Ric ric) {
return Mono.just(ric) //
.flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
return ric.getClient().getPolicyTypeIdentities() //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
return ric.getClient().getPolicyTypeIdentities() //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
public class RicSynchronizationTask {
private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
public class RicSynchronizationTask {
private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
private final A1ClientFactory a1ClientFactory;
private final PolicyTypes policyTypes;
private final A1ClientFactory a1ClientFactory;
private final PolicyTypes policyTypes;
.doOnNext(x -> ric.clearSupportedPolicyTypes()) //
.flatMapMany(Flux::fromIterable) //
.doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
.doOnNext(x -> ric.clearSupportedPolicyTypes()) //
.flatMapMany(Flux::fromIterable) //
.doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
- .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
.doOnNext(ric::addSupportedPolicyType); //
}
.doOnNext(ric::addSupportedPolicyType); //
}
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.name())) //
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.name())) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client));
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);