X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FServiceSupervision.java;h=626a9b69314dc1684af53e837ef6e9b7fabb03db;hb=842b9d220588fba7fc17df0cf9c094f91005118b;hp=d75ff4fe1e97506a5f068ee8f8c078f5cb6c368f;hpb=be84d0b846398b9684f9ddba2ef4f60a23cd5eb0;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java index d75ff4fe..626a9b69 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java @@ -35,6 +35,11 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +/** + * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive, + * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is + * removed from the repository. This means that the service needs to register again after this. + */ @Component @EnableScheduling public class ServiceSupervision { @@ -53,15 +58,12 @@ public class ServiceSupervision { @Scheduled(fixedRate = 1000 * 60) public void checkAllServices() { logger.debug("Checking services starting"); - createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete); + createTask().subscribe(this::onPolicyDeleted, null, this::onComplete); } + @SuppressWarnings("squid:S2629") private void onPolicyDeleted(Policy policy) { - logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName()); - } - - private void onError(Throwable t) { - logger.error("Service supervision failed", t); + logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName()); } private void onComplete() { @@ -71,15 +73,16 @@ public class ServiceSupervision { private Flux createTask() { synchronized (services) { return Flux.fromIterable(services.getAll()) // - .filter(service -> service.isExpired()) // - .doOnNext(service -> logger.info("Service is expired:" + service.getName())) // - .flatMap(service -> getAllPolicies(service)) // - .doOnNext(policy -> this.policies.remove(policy)) // - .flatMap(policy -> deletePolicyInRic(policy)); + .filter(Service::isExpired) // + .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) // + .doOnNext(service -> services.remove(service.getName())) // + .flatMap(this::getAllPoliciesForService) // + .doOnNext(policies::remove) // + .flatMap(this::deletePolicyInRic); } } - private Flux getAllPolicies(Service service) { + private Flux getAllPoliciesForService(Service service) { synchronized (policies) { return Flux.fromIterable(policies.getForService(service.getName())); } @@ -87,11 +90,12 @@ public class ServiceSupervision { private Mono deletePolicyInRic(Policy policy) { return a1ClientFactory.createA1Client(policy.ric()) // - .flatMap(client -> client.deletePolicy(policy.id()) // + .flatMap(client -> client.deletePolicy(policy) // .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) // - .map((nothing) -> policy)); + .map(nothing -> policy)); } + @SuppressWarnings("squid:S2629") private Mono handleDeleteFromRicFailure(Policy policy, Throwable e) { logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e); return Mono.empty();