- private Flux<Policy> createTask() {
- synchronized (services) {
- return Flux.fromIterable(services.getAll()) //
- .filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
- .flatMap(service -> getAllPolicies(service)) //
- .doOnNext(policy -> this.policies.remove(policy)) //
- .flatMap(policy -> deletePolicyInRic(policy));
- }
+ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+ private Flux<Policy> deletePolicy(Policy policy) {
+ Lock lock = policy.ric().getLock();
+ return lock.lock(LockType.SHARED) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
+ .flatMap(notUsed -> deletePolicyInRic(policy))
+ .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+ policy.ownerServiceName())) //
+ .doOnNext(notUsed -> lock.unlockBlocking()) //
+ .doOnError(throwable -> lock.unlockBlocking()) //
+ .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+ throwable.getMessage())) //
+ .flatMapMany(notUsed -> Flux.just(policy)) //
+ .onErrorResume(throwable -> Flux.empty());