X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRepositorySupervision.java;h=022ca0f60196dea57449ba0960a53ad231bd6db8;hb=f3461cb776023b950d62edd25eca148b6d354c9c;hp=9ac9d705ad670f95415cdf871cbf482744315f6d;hpb=b47a7130c10bef2bf812366ca971e4eaa938b152;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index 9ac9d705..022ca0f6 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -23,10 +23,13 @@ package org.oransc.policyagent.tasks; import java.util.Collection; import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -48,14 +51,17 @@ public class RepositorySupervision { private final Rics rics; private final Policies policies; private final PolicyTypes policyTypes; - private final A1Client a1Client; + private final A1ClientFactory a1ClientFactory; + private final Services services; @Autowired - public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) { + public RepositorySupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, + Services services) { this.rics = rics; this.policies = policies; - this.a1Client = a1Client; + this.a1ClientFactory = a1ClientFactory; this.policyTypes = policyTypes; + this.services = services; } /** @@ -65,29 +71,55 @@ public class RepositorySupervision { public void checkAllRics() { logger.debug("Checking Rics starting"); createTask().subscribe(this::onRicChecked, this::onError, this::onComplete); - } - private Flux createTask() { - return Flux.fromIterable(rics.getRics()) // - .flatMap(ric -> checkInstances(ric)) // - .flatMap(ric -> checkTypes(ric)); + private Flux createTask() { + synchronized (this.rics) { + return Flux.fromIterable(rics.getRics()) // + .flatMap(ric -> createRicData(ric)) // + .flatMap(ricData -> checkRicState(ricData)) // + .flatMap(ricData -> checkRicPolicies(ricData)) // + .flatMap(ricData -> checkRicPolicyTypes(ricData)); + } } - private Mono checkInstances(Ric ric) { + private static class RicData { + RicData(Ric ric, A1Client a1Client) { + this.ric = ric; + this.a1Client = a1Client; + } - return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .onErrorResume(t -> Mono.empty()) // - .flatMap(ricP -> validateInstances(ricP, ric)); + final Ric ric; + final A1Client a1Client; } - private Flux junk() { - return Flux.empty(); + private Mono createRicData(Ric ric) { + return Mono.just(ric) // + .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // + .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client))); } - private Mono validateInstances(Collection ricPolicies, Ric ric) { - if (ricPolicies.size() != policies.getForRic(ric.name()).size()) { + private Mono checkRicState(RicData ric) { + if (ric.ric.state() == RicState.UNDEFINED) { return startRecovery(ric); + } else if (ric.ric.state() == RicState.RECOVERING) { + return Mono.empty(); + } else { + return Mono.just(ric); + } + } + + private Mono checkRicPolicies(RicData ric) { + return ric.a1Client.getPolicyIdentities() // + .onErrorResume(t -> Mono.empty()) // + .flatMap(ricP -> validateInstances(ricP, ric)); + } + + private Mono validateInstances(Collection ricPolicies, RicData ric) { + synchronized (this.policies) { + if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) { + return startRecovery(ric); + } } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { @@ -97,34 +129,34 @@ public class RepositorySupervision { return Mono.just(ric); } - private Mono checkTypes(Ric ric) { - return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + private Mono checkRicPolicyTypes(RicData ric) { + return ric.a1Client.getPolicyTypeIdentities() // .onErrorResume(t -> { return Mono.empty(); }) // .flatMap(ricTypes -> validateTypes(ricTypes, ric)); } - private Mono validateTypes(Collection ricTypes, Ric ric) { - if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) { + private Mono validateTypes(Collection ricTypes, RicData ric) { + if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) { return startRecovery(ric); } for (String typeName : ricTypes) { - if (!ric.isSupportingType(typeName)) { + if (!ric.ric.isSupportingType(typeName)) { return startRecovery(ric); } } return Mono.just(ric); } - private Mono startRecovery(Ric ric) { - RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies); - recovery.run(ric); + private Mono startRecovery(RicData ric) { + RicRecoveryTask recovery = new RicRecoveryTask(a1ClientFactory, policyTypes, policies, services); + recovery.run(ric.ric); return Mono.empty(); } - private void onRicChecked(Ric ric) { - logger.info("Ric: " + ric.name() + " checked"); + private void onRicChecked(RicData ric) { + logger.info("Ric: " + ric.ric.name() + " checked"); } private void onError(Throwable t) {