+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return;
+ }
+
+ ric.getLock().lock(LockType.EXCLUSIVE) //
+ .flatMap(notUsed -> setRicState(ric)) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+ .subscribe(new BaseSubscriber<Object>() {
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+ ric.setState(RicState.UNAVAILABLE);
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ onSynchronizationComplete(ric);
+ }
+
+ @Override
+ protected void hookFinally(SignalType type) {
+ ric.getLock().unlockBlocking();
+ }
+ });
+ }
+
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<Ric> setRicState(Ric ric) {