X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRepositorySupervision.java;h=d937785014c54952a4fa172dfa116060e304eef4;hb=2de65533ba065c8a8cad97949d22d04cc3cd6ad9;hp=f75db217c5cd67ffe5f7bb6c87a3c684e079a02c;hpb=8f1c85c3604a0d10675cacd16a7b67dca346d478;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index f75db217..d9377850 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -79,14 +79,20 @@ public class RepositorySupervision { synchronized (this.rics) { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // - .flatMap(this::checkRicState) // - .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) // - .flatMap(this::checkRicPolicies) // - .doOnNext(ricData -> ricData.ric.getLock().unlock()) // - .flatMap(this::checkRicPolicyTypes); // + .flatMap(this::checkOneRic) // + .onErrorResume(throwable -> Mono.empty()); } } + private Mono checkOneRic(RicData ricData) { + return checkRicState(ricData) // + .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) // + .flatMap(x -> checkRicPolicies(ricData)) // + .flatMap(x -> ricData.ric.getLock().unlock()) // + .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) // + .flatMap(x -> checkRicPolicyTypes(ricData)); // + } + private static class RicData { RicData(Ric ric, A1Client a1Client) { this.ric = ric; @@ -105,7 +111,8 @@ public class RepositorySupervision { private Mono checkRicState(RicData ric) { if (ric.ric.getState() == RicState.UNDEFINED) { - return startSynchronization(ric); + return startSynchronization(ric) // + .onErrorResume(t -> Mono.empty()); } else if (ric.ric.getState() == RicState.SYNCHRONIZING) { return Mono.empty(); } else { @@ -115,23 +122,17 @@ public class RepositorySupervision { private Mono checkRicPolicies(RicData ric) { return ric.a1Client.getPolicyIdentities() // - .onErrorResume(t -> { - ric.ric.getLock().unlock(); - return Mono.empty(); - }) // .flatMap(ricP -> validateInstances(ricP, ric)); } private Mono validateInstances(Collection ricPolicies, RicData ric) { synchronized (this.policies) { if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) { - ric.ric.getLock().unlock(); return startSynchronization(ric); } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { - ric.ric.getLock().unlock(); return startSynchronization(ric); } } @@ -141,7 +142,6 @@ public class RepositorySupervision { private Mono checkRicPolicyTypes(RicData ric) { return ric.a1Client.getPolicyTypeIdentities() // - .onErrorResume(notUsed -> Mono.empty()) // .flatMap(ricTypes -> validateTypes(ricTypes, ric)); } @@ -158,9 +158,9 @@ public class RepositorySupervision { } private Mono startSynchronization(RicData ric) { - RicSynchronizationTask recovery = createSynchronizationTask(); - recovery.run(ric.ric); - return Mono.empty(); + RicSynchronizationTask synchronizationTask = createSynchronizationTask(); + synchronizationTask.run(ric.ric); + return Mono.error(new Exception("Syncronization started")); } @SuppressWarnings("squid:S2629")