X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRepositorySupervision.java;h=22905f6ce044aaabc46fe91fabaacc347230cd1d;hb=5e8fe3d5256872bda413aabb66cf217e6ef30cef;hp=667a70101286e77f48f7d1782e652c14804bd261;hpb=b66dcce5210e25b2571036becb6f0e7b0c23e1b2;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index 667a7010..22905f6c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2019 Nordix Foundation + * Copyright (C) 2020 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.util.Collection; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.clients.A1ClientFactory; +import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; @@ -41,7 +42,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Regularly checks the existing rics towards the local repository to keep it consistent. + * Regularly checks the existing rics towards the local repository to keep it + * consistent. */ @Component @EnableScheduling @@ -77,12 +79,20 @@ public class RepositorySupervision { synchronized (this.rics) { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // - .flatMap(this::checkRicState) // - .flatMap(this::checkRicPolicies) // - .flatMap(this::checkRicPolicyTypes); + .flatMap(this::checkOneRic) // + .onErrorResume(throwable -> Mono.empty()); } } + private Mono checkOneRic(RicData ricData) { + return checkRicState(ricData) // + .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) // + .flatMap(x -> checkRicPolicies(ricData)) // + .flatMap(x -> ricData.ric.getLock().unlock()) // + .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) // + .flatMap(x -> checkRicPolicyTypes(ricData)); // + } + private static class RicData { RicData(Ric ric, A1Client a1Client) { this.ric = ric; @@ -101,7 +111,8 @@ public class RepositorySupervision { private Mono checkRicState(RicData ric) { if (ric.ric.getState() == RicState.UNDEFINED) { - return startSynchronization(ric); + return startSynchronization(ric) // + .onErrorResume(t -> Mono.empty()); } else if (ric.ric.getState() == RicState.SYNCHRONIZING) { return Mono.empty(); } else { @@ -111,7 +122,6 @@ public class RepositorySupervision { private Mono checkRicPolicies(RicData ric) { return ric.a1Client.getPolicyIdentities() // - .onErrorResume(t -> Mono.empty()) // .flatMap(ricP -> validateInstances(ricP, ric)); } @@ -120,18 +130,18 @@ public class RepositorySupervision { if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) { return startSynchronization(ric); } - } - for (String policyId : ricPolicies) { - if (!policies.containsPolicy(policyId)) { - return startSynchronization(ric); + + for (String policyId : ricPolicies) { + if (!policies.containsPolicy(policyId)) { + return startSynchronization(ric); + } } + return Mono.just(ric); } - return Mono.just(ric); } private Mono checkRicPolicyTypes(RicData ric) { return ric.a1Client.getPolicyTypeIdentities() // - .onErrorResume(notUsed -> Mono.empty()) // .flatMap(ricTypes -> validateTypes(ricTypes, ric)); } @@ -148,12 +158,12 @@ public class RepositorySupervision { } private Mono startSynchronization(RicData ric) { - RicSynchronizationTask recovery = createSynchronizationTask(); - recovery.run(ric.ric); - return Mono.empty(); + RicSynchronizationTask synchronizationTask = createSynchronizationTask(); + synchronizationTask.run(ric.ric); + return Mono.error(new Exception("Syncronization started")); } - @SuppressWarnings("squid:S2629") + @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally private void onRicChecked(RicData ric) { logger.debug("Ric: {} checked", ric.ric.name()); } @@ -165,4 +175,4 @@ public class RepositorySupervision { RicSynchronizationTask createSynchronizationTask() { return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); } -} \ No newline at end of file +}