X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRepositorySupervision.java;h=9ac9d705ad670f95415cdf871cbf482744315f6d;hb=b47a7130c10bef2bf812366ca971e4eaa938b152;hp=d68c123642d802e051e80c26d1ab17beb91b48c5;hpb=9ed2de339dc86499b5497cfb905fbcc8974d7e16;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index d68c1236..9ac9d705 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -20,17 +20,20 @@ package org.oransc.policyagent.tasks; +import java.util.Collection; + import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; -import org.oransc.policyagent.repository.Ric.RicState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -44,13 +47,15 @@ public class RepositorySupervision { private final Rics rics; private final Policies policies; + private final PolicyTypes policyTypes; private final A1Client a1Client; @Autowired - public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) { + public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) { this.rics = rics; this.policies = policies; this.a1Client = a1Client; + this.policyTypes = policyTypes; } /** @@ -65,45 +70,59 @@ public class RepositorySupervision { private Flux createTask() { return Flux.fromIterable(rics.getRics()) // - .groupBy(ric -> ric.state()) // - .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup)); + .flatMap(ric -> checkInstances(ric)) // + .flatMap(ric -> checkTypes(ric)); } - private Flux handleGroup(Ric.RicState key, Flux fluxGroup) { - logger.debug("Handling group {}", key); - switch (key) { - case ACTIVE: - return fluxGroup.flatMap(this::checkActive); + private Mono checkInstances(Ric ric) { - case NOT_REACHABLE: - return fluxGroup.flatMap(this::checkNotReachable); + return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + .onErrorResume(t -> Mono.empty()) // + .flatMap(ricP -> validateInstances(ricP, ric)); + } - default: - // If not initiated, leave it to the StartupService. - return Flux.empty(); - } + private Flux junk() { + return Flux.empty(); } - private Mono checkActive(Ric ric) { - logger.debug("Handling active ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(); + private Mono validateInstances(Collection ricPolicies, Ric ric) { + if (ricPolicies.size() != policies.getForRic(ric.name()).size()) { + return startRecovery(ric); + } + for (String policyId : ricPolicies) { + if (!policies.containsPolicy(policyId)) { + return startRecovery(ric); + } + } return Mono.just(ric); } - private Mono checkNotReachable(Ric ric) { - logger.debug("Handling not reachable ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, null, () -> ric.setState(RicState.ACTIVE)); + private Mono checkTypes(Ric ric) { + return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + .onErrorResume(t -> { + return Mono.empty(); + }) // + .flatMap(ricTypes -> validateTypes(ricTypes, ric)); + } + + private Mono validateTypes(Collection ricTypes, Ric ric) { + if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) { + return startRecovery(ric); + } + for (String typeName : ricTypes) { + if (!ric.isSupportingType(typeName)) { + return startRecovery(ric); + } + } return Mono.just(ric); } + private Mono startRecovery(Ric ric) { + RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies); + recovery.run(ric); + return Mono.empty(); + } + private void onRicChecked(Ric ric) { logger.info("Ric: " + ric.name() + " checked"); }