}
private Flux<Policy> createTask() {
- return Flux.fromIterable(services.getAll()) //
- .filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
- .flatMap(service -> getAllPolicies(service)) //
- .doOnNext(policy -> this.policies.remove(policy)) //
- .flatMap(policy -> deletePolicyInRic(policy));
+ synchronized (services) {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(service -> service.isExpired()) //
+ .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
+ .flatMap(service -> getAllPolicies(service)) //
+ .doOnNext(policy -> this.policies.remove(policy)) //
+ .flatMap(policy -> deletePolicyInRic(policy));
+ }
}
private Flux<Policy> getAllPolicies(Service service) {
- return Flux.fromIterable(policies.getForService(service.getName()));
+ synchronized (policies) {
+ return Flux.fromIterable(policies.getForService(service.getName()));
+ }
}
private Mono<Policy> deletePolicyInRic(Policy policy) {