import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
+ * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
+ * removed from the repository. This means that the service needs to register again after this.
+ */
@Component
@EnableScheduling
public class ServiceSupervision {
@Scheduled(fixedRate = 1000 * 60)
public void checkAllServices() {
logger.debug("Checking services starting");
- createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete);
+ createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
}
+ @SuppressWarnings("squid:S2629")
private void onPolicyDeleted(Policy policy) {
- logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName());
- }
-
- private void onError(Throwable t) {
- logger.error("Service supervision failed", t);
+ logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
}
private void onComplete() {
private Flux<Policy> createTask() {
synchronized (services) {
return Flux.fromIterable(services.getAll()) //
- .filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
- .flatMap(service -> getAllPolicies(service)) //
- .doOnNext(policy -> this.policies.remove(policy)) //
- .flatMap(policy -> deletePolicyInRic(policy));
+ .filter(Service::isExpired) //
+ .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+ .doOnNext(service -> services.remove(service.getName())) //
+ .flatMap(this::getAllPoliciesForService) //
+ .doOnNext(policies::remove) //
+ .flatMap(this::deletePolicyInRic);
}
}
- private Flux<Policy> getAllPolicies(Service service) {
+ private Flux<Policy> getAllPoliciesForService(Service service) {
synchronized (policies) {
return Flux.fromIterable(policies.getForService(service.getName()));
}
private Mono<Policy> deletePolicyInRic(Policy policy) {
return a1ClientFactory.createA1Client(policy.ric()) //
- .flatMap(client -> client.deletePolicy(policy.id()) //
+ .flatMap(client -> client.deletePolicy(policy) //
.onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
- .map((nothing) -> policy));
+ .map(nothing -> policy));
}
+ @SuppressWarnings("squid:S2629")
private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
return Mono.empty();