* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2019 Nordix Foundation
+ * Copyright (C) 2020 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import reactor.core.publisher.Mono;
/**
- * Regularly checks the exisiting rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent.
*/
@Component
@EnableScheduling
}
/**
- * Regularly contacts all Rics to check if they are alive.
+ * Regularly contacts all Rics to check if they are alive and synchronized.
*/
@Scheduled(fixedRate = 1000 * 60)
public void checkAllRics() {
logger.debug("Checking Rics starting");
- createTask().subscribe(this::onRicChecked, this::onError, this::onComplete);
+ createTask().subscribe(this::onRicChecked, null, this::onComplete);
}
private Flux<RicData> createTask() {
synchronized (this.rics) {
return Flux.fromIterable(rics.getRics()) //
- .flatMap(ric -> createRicData(ric)) //
- .flatMap(ricData -> checkRicState(ricData)) //
- .flatMap(ricData -> checkRicPolicies(ricData)) //
- .flatMap(ricData -> checkRicPolicyTypes(ricData));
+ .flatMap(this::createRicData) //
+ .flatMap(this::checkOneRic) //
+ .onErrorResume(throwable -> Mono.empty());
}
}
+ private Mono<RicData> checkOneRic(RicData ricData) {
+ return checkRicState(ricData) //
+ .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+ .flatMap(x -> checkRicPolicies(ricData)) //
+ .flatMap(x -> ricData.ric.getLock().unlock()) //
+ .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
+ .flatMap(x -> checkRicPolicyTypes(ricData)); //
+ }
+
private static class RicData {
RicData(Ric ric, A1Client a1Client) {
this.ric = ric;
}
private Mono<RicData> checkRicState(RicData ric) {
- if (ric.ric.state() == RicState.UNDEFINED) {
- return startRecovery(ric);
- } else if (ric.ric.state() == RicState.RECOVERING) {
+ if (ric.ric.getState() == RicState.UNDEFINED) {
+ return startSynchronization(ric) //
+ .onErrorResume(t -> Mono.empty());
+ } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
return Mono.empty();
} else {
return Mono.just(ric);
private Mono<RicData> checkRicPolicies(RicData ric) {
return ric.a1Client.getPolicyIdentities() //
- .onErrorResume(t -> Mono.empty()) //
.flatMap(ricP -> validateInstances(ricP, ric));
}
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
- return startRecovery(ric);
+ return startSynchronization(ric);
}
- }
- for (String policyId : ricPolicies) {
- if (!policies.containsPolicy(policyId)) {
- return startRecovery(ric);
+
+ for (String policyId : ricPolicies) {
+ if (!policies.containsPolicy(policyId)) {
+ return startSynchronization(ric);
+ }
}
+ return Mono.just(ric);
}
- return Mono.just(ric);
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
return ric.a1Client.getPolicyTypeIdentities() //
- .onErrorResume(t -> {
- return Mono.empty();
- }) //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
- return startRecovery(ric);
+ return startSynchronization(ric);
}
for (String typeName : ricTypes) {
if (!ric.ric.isSupportingType(typeName)) {
- return startRecovery(ric);
+ return startSynchronization(ric);
}
}
return Mono.just(ric);
}
- private Mono<RicData> startRecovery(RicData ric) {
- RicRecoveryTask recovery = new RicRecoveryTask(a1ClientFactory, policyTypes, policies, services);
- recovery.run(ric.ric);
- return Mono.empty();
+ private Mono<RicData> startSynchronization(RicData ric) {
+ RicSynchronizationTask synchronizationTask = createSynchronizationTask();
+ synchronizationTask.run(ric.ric);
+ return Mono.error(new Exception("Syncronization started"));
}
+ @SuppressWarnings("squid:S2629")
private void onRicChecked(RicData ric) {
- logger.info("Ric: " + ric.ric.name() + " checked");
- }
-
- private void onError(Throwable t) {
- logger.error("Rics supervision failed", t);
+ logger.debug("Ric: {} checked", ric.ric.name());
}
private void onComplete() {
logger.debug("Checking Rics completed");
}
+ RicSynchronizationTask createSynchronizationTask() {
+ return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+ }
}