synchronized (this.rics) {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkRicState) //
- .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
- .flatMap(this::checkRicPolicies) //
- .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
- .flatMap(this::checkRicPolicyTypes); //
+ .flatMap(this::checkOneRic) //
+ .onErrorResume(throwable -> Mono.empty());
}
}
+ private Mono<RicData> checkOneRic(RicData ricData) {
+ return checkRicState(ricData) //
+ .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+ .flatMap(x -> checkRicPolicies(ricData)) //
+ .flatMap(x -> ricData.ric.getLock().unlock()) //
+ .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
+ .flatMap(x -> checkRicPolicyTypes(ricData)); //
+ }
+
private static class RicData {
RicData(Ric ric, A1Client a1Client) {
this.ric = ric;
private Mono<RicData> checkRicState(RicData ric) {
if (ric.ric.getState() == RicState.UNDEFINED) {
- return startSynchronization(ric);
+ return startSynchronization(ric) //
+ .onErrorResume(t -> Mono.empty());
} else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
return Mono.empty();
} else {
private Mono<RicData> checkRicPolicies(RicData ric) {
return ric.a1Client.getPolicyIdentities() //
- .onErrorResume(t -> {
- ric.ric.getLock().unlock();
- return Mono.empty();
- }) //
.flatMap(ricP -> validateInstances(ricP, ric));
}
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
- ric.ric.getLock().unlock();
return startSynchronization(ric);
}
for (String policyId : ricPolicies) {
if (!policies.containsPolicy(policyId)) {
- ric.ric.getLock().unlock();
return startSynchronization(ric);
}
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
return ric.a1Client.getPolicyTypeIdentities() //
- .onErrorResume(notUsed -> Mono.empty()) //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask recovery = createSynchronizationTask();
recovery.run(ric.ric);
- return Mono.empty();
+ return Mono.error(new Exception("Syncronization started"));
}
@SuppressWarnings("squid:S2629")