X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRepositorySupervision.java;h=ce318dd7697afaad77c2f089c3d5873f5691d475;hb=28b508e5df22fd468d18769449710bd0764a778d;hp=7984a62a688dae0feb14ea728e209e3912208304;hpb=50e6a619a2f568f949c02dcd8d6656218d422a93;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index 7984a62a..ce318dd7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2019 Nordix Foundation + * Copyright (C) 2020 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,16 @@ package org.oransc.policyagent.tasks; +import java.util.Collection; + import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,7 +41,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Regularly checks the exisiting rics towards the local repository to keep it consistent. + * Regularly checks the existing rics towards the local repository to keep it consistent. */ @Component @EnableScheduling @@ -45,76 +50,119 @@ public class RepositorySupervision { private final Rics rics; private final Policies policies; - private final A1Client a1Client; + private final PolicyTypes policyTypes; + private final A1ClientFactory a1ClientFactory; + private final Services services; @Autowired - public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) { + public RepositorySupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, + Services services) { this.rics = rics; this.policies = policies; - this.a1Client = a1Client; + this.a1ClientFactory = a1ClientFactory; + this.policyTypes = policyTypes; + this.services = services; } /** - * Regularly contacts all Rics to check if they are alive. + * Regularly contacts all Rics to check if they are alive and synchronized. */ @Scheduled(fixedRate = 1000 * 60) public void checkAllRics() { logger.debug("Checking Rics starting"); - createTask().subscribe(this::onRicChecked, this::onError, this::onComplete); - + createTask().subscribe(this::onRicChecked, null, this::onComplete); } - private Flux createTask() { - return Flux.fromIterable(rics.getRics()) // - .groupBy(ric -> ric.state()) // - .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup)); + private Flux createTask() { + synchronized (this.rics) { + return Flux.fromIterable(rics.getRics()) // + .flatMap(this::createRicData) // + .flatMap(this::checkRicState) // + .flatMap(this::checkRicPolicies) // + .flatMap(this::checkRicPolicyTypes); + } } - private Flux handleGroup(Ric.RicState key, Flux fluxGroup) { - logger.debug("Handling group {}", key); - switch (key) { - case ACTIVE: - return fluxGroup.flatMap(this::checkActive); + private static class RicData { + RicData(Ric ric, A1Client a1Client) { + this.ric = ric; + this.a1Client = a1Client; + } + + final Ric ric; + final A1Client a1Client; + } - case NOT_REACHABLE: - return fluxGroup.flatMap(this::checkNotReachable); + private Mono createRicData(Ric ric) { + return Mono.just(ric) // + .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) // + .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client))); + } - default: - // If not initiated, leave it to the StartupService. - return Flux.empty(); + private Mono checkRicState(RicData ric) { + if (ric.ric.getState() == RicState.UNDEFINED) { + return startSynchronization(ric); + } else if (ric.ric.getState() == RicState.SYNCHRONIZING) { + return Mono.empty(); + } else { + return Mono.just(ric); } } - private Mono checkActive(Ric ric) { - logger.debug("Handling active ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(); + private Mono checkRicPolicies(RicData ric) { + return ric.a1Client.getPolicyIdentities() // + .onErrorResume(t -> Mono.empty()) // + .flatMap(ricP -> validateInstances(ricP, ric)); + } + + private Mono validateInstances(Collection ricPolicies, RicData ric) { + synchronized (this.policies) { + if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) { + return startSynchronization(ric); + } + } + for (String policyId : ricPolicies) { + if (!policies.containsPolicy(policyId)) { + return startSynchronization(ric); + } + } return Mono.just(ric); } - private Mono checkNotReachable(Ric ric) { - logger.debug("Handling not reachable ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, null, () -> ric.setState(RicState.ACTIVE)); + private Mono checkRicPolicyTypes(RicData ric) { + return ric.a1Client.getPolicyTypeIdentities() // + .onErrorResume(notUsed -> Mono.empty()) // + .flatMap(ricTypes -> validateTypes(ricTypes, ric)); + } + + private Mono validateTypes(Collection ricTypes, RicData ric) { + if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) { + return startSynchronization(ric); + } + for (String typeName : ricTypes) { + if (!ric.ric.isSupportingType(typeName)) { + return startSynchronization(ric); + } + } return Mono.just(ric); } - private void onRicChecked(Ric ric) { - logger.info("Ric: " + ric.name() + " checked"); + private Mono startSynchronization(RicData ric) { + RicSynchronizationTask recovery = createSynchronizationTask(); + recovery.run(ric.ric); + return Mono.empty(); } - private void onError(Throwable t) { - logger.error("Rics supervision failed", t); + @SuppressWarnings("squid:S2629") + private void onRicChecked(RicData ric) { + logger.debug("Ric: {} checked", ric.ric.name()); } private void onComplete() { logger.debug("Checking Rics completed"); } -} + RicSynchronizationTask createSynchronizationTask() { + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); + } +} \ No newline at end of file