X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FRepositorySupervision.java;h=ce318dd7697afaad77c2f089c3d5873f5691d475;hb=28b508e5df22fd468d18769449710bd0764a778d;hp=022ca0f60196dea57449ba0960a53ad231bd6db8;hpb=95db19e2820102db0255ca57407faa333cbb4085;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index 022ca0f6..ce318dd7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2019 Nordix Foundation + * Copyright (C) 2020 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Regularly checks the exisiting rics towards the local repository to keep it consistent. + * Regularly checks the existing rics towards the local repository to keep it consistent. */ @Component @EnableScheduling @@ -65,21 +65,21 @@ public class RepositorySupervision { } /** - * Regularly contacts all Rics to check if they are alive. + * Regularly contacts all Rics to check if they are alive and synchronized. */ @Scheduled(fixedRate = 1000 * 60) public void checkAllRics() { logger.debug("Checking Rics starting"); - createTask().subscribe(this::onRicChecked, this::onError, this::onComplete); + createTask().subscribe(this::onRicChecked, null, this::onComplete); } private Flux createTask() { synchronized (this.rics) { return Flux.fromIterable(rics.getRics()) // - .flatMap(ric -> createRicData(ric)) // - .flatMap(ricData -> checkRicState(ricData)) // - .flatMap(ricData -> checkRicPolicies(ricData)) // - .flatMap(ricData -> checkRicPolicyTypes(ricData)); + .flatMap(this::createRicData) // + .flatMap(this::checkRicState) // + .flatMap(this::checkRicPolicies) // + .flatMap(this::checkRicPolicyTypes); } } @@ -100,9 +100,9 @@ public class RepositorySupervision { } private Mono checkRicState(RicData ric) { - if (ric.ric.state() == RicState.UNDEFINED) { - return startRecovery(ric); - } else if (ric.ric.state() == RicState.RECOVERING) { + if (ric.ric.getState() == RicState.UNDEFINED) { + return startSynchronization(ric); + } else if (ric.ric.getState() == RicState.SYNCHRONIZING) { return Mono.empty(); } else { return Mono.just(ric); @@ -118,12 +118,12 @@ public class RepositorySupervision { private Mono validateInstances(Collection ricPolicies, RicData ric) { synchronized (this.policies) { if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) { - return startRecovery(ric); + return startSynchronization(ric); } } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { - return startRecovery(ric); + return startSynchronization(ric); } } return Mono.just(ric); @@ -131,40 +131,38 @@ public class RepositorySupervision { private Mono checkRicPolicyTypes(RicData ric) { return ric.a1Client.getPolicyTypeIdentities() // - .onErrorResume(t -> { - return Mono.empty(); - }) // + .onErrorResume(notUsed -> Mono.empty()) // .flatMap(ricTypes -> validateTypes(ricTypes, ric)); } private Mono validateTypes(Collection ricTypes, RicData ric) { if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) { - return startRecovery(ric); + return startSynchronization(ric); } for (String typeName : ricTypes) { if (!ric.ric.isSupportingType(typeName)) { - return startRecovery(ric); + return startSynchronization(ric); } } return Mono.just(ric); } - private Mono startRecovery(RicData ric) { - RicRecoveryTask recovery = new RicRecoveryTask(a1ClientFactory, policyTypes, policies, services); + private Mono startSynchronization(RicData ric) { + RicSynchronizationTask recovery = createSynchronizationTask(); recovery.run(ric.ric); return Mono.empty(); } + @SuppressWarnings("squid:S2629") private void onRicChecked(RicData ric) { - logger.info("Ric: " + ric.ric.name() + " checked"); - } - - private void onError(Throwable t) { - logger.error("Rics supervision failed", t); + logger.debug("Ric: {} checked", ric.ric.name()); } private void onComplete() { logger.debug("Checking Rics completed"); } -} + RicSynchronizationTask createSynchronizationTask() { + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); + } +} \ No newline at end of file