X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FServiceSupervision.java;h=9c5553296b5a35806df64515ced608e1b477d57d;hb=6a39814272307d0207222c9229b0d765ac062bf0;hp=d75ff4fe1e97506a5f068ee8f8c078f5cb6c368f;hpb=934a146caf5c9d0f735f913375d55b59041b9db5;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java index d75ff4fe..9c555329 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java @@ -20,7 +20,11 @@ package org.oransc.policyagent.tasks; +import java.time.Duration; + import org.oransc.policyagent.clients.A1ClientFactory; +import org.oransc.policyagent.repository.Lock; +import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.Service; @@ -29,71 +33,92 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +/** + * Periodically checks that services with a keepAliveInterval set are alive. If + * a service is deemed not alive, all the service's policies are deleted, both + * in the repository and in the affected Rics, and the service is removed from + * the repository. This means that the service needs to register again after + * this. + */ @Component @EnableScheduling +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class ServiceSupervision { private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class); + static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent private final Services services; private final Policies policies; private A1ClientFactory a1ClientFactory; + private final Duration checkInterval; @Autowired public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) { + this(services, policies, a1ClientFactory, Duration.ofMinutes(1)); + } + + public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory, + Duration checkInterval) { this.services = services; this.policies = policies; this.a1ClientFactory = a1ClientFactory; + this.checkInterval = checkInterval; + start(); } - @Scheduled(fixedRate = 1000 * 60) - public void checkAllServices() { + private void start() { logger.debug("Checking services starting"); - createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete); - } - - private void onPolicyDeleted(Policy policy) { - logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName()); + createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated")); } - private void onError(Throwable t) { - logger.error("Service supervision failed", t); + private Flux createTask() { + return Flux.interval(this.checkInterval) // + .flatMap(notUsed -> checkAllServices()); } - private void onComplete() { - logger.debug("Checking services completed"); + Flux checkAllServices() { + return Flux.fromIterable(services.getAll()) // + .filter(Service::isExpired) // + .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) // + .doOnNext(service -> services.remove(service.getName())) // + .flatMap(this::getAllPoliciesForService) // + .flatMap(this::deletePolicy, CONCURRENCY_RIC); } - private Flux createTask() { - synchronized (services) { - return Flux.fromIterable(services.getAll()) // - .filter(service -> service.isExpired()) // - .doOnNext(service -> logger.info("Service is expired:" + service.getName())) // - .flatMap(service -> getAllPolicies(service)) // - .doOnNext(policy -> this.policies.remove(policy)) // - .flatMap(policy -> deletePolicyInRic(policy)); - } + @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally + private Flux deletePolicy(Policy policy) { + Lock lock = policy.ric().getLock(); + return lock.lock(LockType.SHARED) // + .doOnNext(notUsed -> policies.remove(policy)) // + .flatMap(notUsed -> deletePolicyInRic(policy)) + .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(), + policy.ownerServiceName())) // + .doOnNext(notUsed -> lock.unlockBlocking()) // + .doOnError(throwable -> lock.unlockBlocking()) // + .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(), + throwable.getMessage())) // + .flatMapMany(notUsed -> Flux.just(policy)) // + .onErrorResume(throwable -> Flux.empty()); } - private Flux getAllPolicies(Service service) { - synchronized (policies) { - return Flux.fromIterable(policies.getForService(service.getName())); - } + private Flux getAllPoliciesForService(Service service) { + return Flux.fromIterable(policies.getForService(service.getName())); } private Mono deletePolicyInRic(Policy policy) { return a1ClientFactory.createA1Client(policy.ric()) // - .flatMap(client -> client.deletePolicy(policy.id()) // + .flatMap(client -> client.deletePolicy(policy) // .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) // - .map((nothing) -> policy)); + .map(nothing -> policy)); } private Mono handleDeleteFromRicFailure(Policy policy, Throwable e) { - logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e); + logger.warn("Could not delete policy: {} from ric: {}. Cause: {}", policy.id(), policy.ric().name(), + e.getMessage()); return Mono.empty(); } }