.filter(service -> service.isExpired()) //
.doOnNext(service -> logger.info("Service is expired:" + service.getName()))
.flatMap(service -> getAllPolicies(service)) //
- .flatMap(policy -> deletePolicy(policy));
+ .doOnNext(policy -> this.policies.remove(policy));
}
Flux<Policy> getAllPolicies(Service service) {
return Flux.fromIterable(policies.getForService(service.getName()));
}
- Flux<Policy> deletePolicy(Policy policy) {
- this.policies.remove(policy);
- return Flux.just(policy);
- }
-
}