package org.oransc.policyagent.tasks;
-import org.oransc.policyagent.clients.A1Client;
+import java.time.Duration;
+
+import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.Service;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If
+ * a service is deemed not alive, all the service's policies are deleted, both
+ * in the repository and in the affected Rics, and the service is removed from
+ * the repository. This means that the service needs to register again after
+ * this.
+ */
@Component
@EnableScheduling
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class ServiceSupervision {
private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
private final Services services;
private final Policies policies;
- private A1Client a1Client;
+ private A1ClientFactory a1ClientFactory;
+ private final Duration checkInterval;
@Autowired
- public ServiceSupervision(Services services, Policies policies, A1Client a1Client) {
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
+ this(services, policies, a1ClientFactory, Duration.ofMinutes(1));
+ }
+
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory,
+ Duration checkInterval) {
this.services = services;
this.policies = policies;
- this.a1Client = a1Client;
+ this.a1ClientFactory = a1ClientFactory;
+ this.checkInterval = checkInterval;
+ start();
}
- @Scheduled(fixedRate = 1000 * 60)
- public void checkAllServices() {
+ private void start() {
logger.debug("Checking services starting");
- createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete);
- }
-
- private void onPolicyDeleted(Policy policy) {
- logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName());
+ createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated"));
}
- private void onError(Throwable t) {
- logger.error("Service supervision failed", t);
+ private Flux<?> createTask() {
+ return Flux.interval(this.checkInterval) //
+ .flatMap(notUsed -> checkAllServices());
}
- private void onComplete() {
- logger.debug("Checking services completed");
+ Flux<Policy> checkAllServices() {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(Service::isExpired) //
+ .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+ .doOnNext(service -> services.remove(service.getName())) //
+ .flatMap(this::getAllPoliciesForService) //
+ .flatMap(this::deletePolicy);
}
- private Flux<Policy> createTask() {
- synchronized (services) {
- return Flux.fromIterable(services.getAll()) //
- .filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
- .flatMap(service -> getAllPolicies(service)) //
- .doOnNext(policy -> this.policies.remove(policy)) //
- .flatMap(policy -> deletePolicyInRic(policy));
- }
+ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+ private Flux<Policy> deletePolicy(Policy policy) {
+ Lock lock = policy.ric().getLock();
+ return lock.lock(LockType.SHARED) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
+ .flatMap(notUsed -> deletePolicyInRic(policy))
+ .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+ policy.ownerServiceName())) //
+ .doOnNext(notUsed -> lock.unlockBlocking()) //
+ .doOnError(throwable -> lock.unlockBlocking()) //
+ .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+ throwable.getMessage())) //
+ .flatMapMany(notUsed -> Flux.just(policy)) //
+ .onErrorResume(throwable -> Flux.empty());
}
- private Flux<Policy> getAllPolicies(Service service) {
- synchronized (policies) {
- return Flux.fromIterable(policies.getForService(service.getName()));
- }
+ private Flux<Policy> getAllPoliciesForService(Service service) {
+ return Flux.fromIterable(policies.getForService(service.getName()));
}
private Mono<Policy> deletePolicyInRic(Policy policy) {
- return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), policy.id()) //
- .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
- .map((nothing) -> policy);
+ return a1ClientFactory.createA1Client(policy.ric()) //
+ .flatMap(client -> client.deletePolicy(policy) //
+ .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
+ .map(nothing -> policy));
}
private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
- logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
+ logger.warn("Could not delete policy: {} from ric: {}. Cause: {}", policy.id(), policy.ric().name(),
+ e.getMessage());
return Mono.empty();
}
}