X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRicSynchronizationTask.java;h=6ae55c46a771ff162f603800e65b2ff733edb53a;hb=refs%2Fchanges%2F95%2F4495%2F1;hp=ae91d4b258bb35643a6fa1f8569d29591c78fd17;hpb=ee5dde3cccb64e50c02f22a5731ba0134e0d761c;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index ae91d4b2..6ae55c46 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -58,6 +58,7 @@ import reactor.core.publisher.SignalType; public class RicSynchronizationTask { private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class); + static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC private final A1ClientFactory a1ClientFactory; private final PolicyTypes policyTypes; @@ -133,7 +134,7 @@ public class RicSynchronizationTask { private void notifyAllServices(String body) { for (Service service : services.getAll()) { String url = service.getCallbackUrl(); - if (service.getCallbackUrl().length() > 0) { + if (url.length() > 0) { createNotificationClient(url) // .put("", body) // .subscribe( // @@ -159,7 +160,7 @@ public class RicSynchronizationTask { } AsyncRestClient createNotificationClient(final String url) { - return new AsyncRestClient(url); + return new AsyncRestClient(url, this.a1ClientFactory.getAppConfig().getWebClientConfig()); } private Flux synchronizePolicyTypes(Ric ric, A1Client a1Client) { @@ -167,7 +168,7 @@ public class RicSynchronizationTask { .doOnNext(x -> ric.clearSupportedPolicyTypes()) // .flatMapMany(Flux::fromIterable) // .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // - .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) // + .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // .doOnNext(ric::addSupportedPolicyType); // } @@ -197,9 +198,17 @@ public class RicSynchronizationTask { .flatMapMany(notUsed -> Flux.just(policy)); } + private boolean checkTransient(Policy policy) { + if (policy.isTransient()) { + this.policies.remove(policy); + } + return policy.isTransient(); + } + private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.name())) // - .flatMap(policy -> putPolicy(policy, ric, a1Client)); + .filter(policy -> !checkTransient(policy)) // + .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); } }