import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import reactor.core.publisher.Mono;
/**
- * Regularly checks the existing rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent.
*/
@Component
@EnableScheduling
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.flatMap(this::checkRicState) //
+ .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
.flatMap(this::checkRicPolicies) //
- .flatMap(this::checkRicPolicyTypes);
+ .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
+ .flatMap(this::checkRicPolicyTypes); //
}
}
private Mono<RicData> checkRicPolicies(RicData ric) {
return ric.a1Client.getPolicyIdentities() //
- .onErrorResume(t -> Mono.empty()) //
+ .onErrorResume(t -> {
+ ric.ric.getLock().unlock();
+ return Mono.empty();
+ }) //
.flatMap(ricP -> validateInstances(ricP, ric));
}
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
+ ric.ric.getLock().unlock();
return startSynchronization(ric);
}
- }
- for (String policyId : ricPolicies) {
- if (!policies.containsPolicy(policyId)) {
- return startSynchronization(ric);
+
+ for (String policyId : ricPolicies) {
+ if (!policies.containsPolicy(policyId)) {
+ ric.ric.getLock().unlock();
+ return startSynchronization(ric);
+ }
}
+ return Mono.just(ric);
}
- return Mono.just(ric);
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
RicSynchronizationTask createSynchronizationTask() {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
}
-}
\ No newline at end of file
+}