X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRicSynchronizationTask.java;h=6ae55c46a771ff162f603800e65b2ff733edb53a;hb=refs%2Fchanges%2F95%2F4495%2F1;hp=e985fd3904150a249adb120f756a3b76dde38dd7;hpb=b744f6266f838dbf60f37b1c8d5367632f1f0faa;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index e985fd39..6ae55c46 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -58,6 +58,7 @@ import reactor.core.publisher.SignalType; public class RicSynchronizationTask { private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class); + static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC private final A1ClientFactory a1ClientFactory; private final PolicyTypes policyTypes; @@ -89,7 +90,7 @@ public class RicSynchronizationTask { @Override protected void hookOnError(Throwable throwable) { logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage()); - ric.setState(RicState.UNDEFINED); + ric.setState(RicState.UNAVAILABLE); } @Override @@ -126,14 +127,14 @@ public class RicSynchronizationTask { private void onSynchronizationComplete(Ric ric) { logger.debug("Synchronization completed for: {}", ric.name()); - ric.setState(RicState.IDLE); + ric.setState(RicState.AVAILABLE); notifyAllServices("Synchronization completed for:" + ric.name()); } private void notifyAllServices(String body) { for (Service service : services.getAll()) { String url = service.getCallbackUrl(); - if (service.getCallbackUrl().length() > 0) { + if (url.length() > 0) { createNotificationClient(url) // .put("", body) // .subscribe( // @@ -146,7 +147,7 @@ public class RicSynchronizationTask { } private Flux deleteAllPolicyInstances(Ric ric, Throwable t) { - logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage()); + logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage()); deleteAllPoliciesInRepository(ric); Flux synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // @@ -159,7 +160,7 @@ public class RicSynchronizationTask { } AsyncRestClient createNotificationClient(final String url) { - return new AsyncRestClient(url); + return new AsyncRestClient(url, this.a1ClientFactory.getAppConfig().getWebClientConfig()); } private Flux synchronizePolicyTypes(Ric ric, A1Client a1Client) { @@ -167,7 +168,7 @@ public class RicSynchronizationTask { .doOnNext(x -> ric.clearSupportedPolicyTypes()) // .flatMapMany(Flux::fromIterable) // .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // - .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) // + .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) // .doOnNext(ric::addSupportedPolicyType); // } @@ -197,9 +198,17 @@ public class RicSynchronizationTask { .flatMapMany(notUsed -> Flux.just(policy)); } + private boolean checkTransient(Policy policy) { + if (policy.isTransient()) { + this.policies.remove(policy); + } + return policy.isTransient(); + } + private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { return Flux.fromIterable(policies.getForRic(ric.name())) // - .flatMap(policy -> putPolicy(policy, ric, a1Client)); + .filter(policy -> !checkTransient(policy)) // + .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC); } }