Merge "Added STD sim 2.0.0 tests"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / ServiceSupervision.java
index 335aa94..9c55532 100644 (file)
 
 package org.oransc.policyagent.tasks;
 
-import org.oransc.policyagent.clients.A1Client;
+import java.time.Duration;
+
+import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.Service;
@@ -29,66 +33,92 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If
+ * a service is deemed not alive, all the service's policies are deleted, both
+ * in the repository and in the affected Rics, and the service is removed from
+ * the repository. This means that the service needs to register again after
+ * this.
+ */
 @Component
 @EnableScheduling
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class ServiceSupervision {
     private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent
     private final Services services;
     private final Policies policies;
-    private A1Client a1Client;
+    private A1ClientFactory a1ClientFactory;
+    private final Duration checkInterval;
 
     @Autowired
-    public ServiceSupervision(Services services, Policies policies, A1Client a1Client) {
+    public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
+        this(services, policies, a1ClientFactory, Duration.ofMinutes(1));
+    }
+
+    public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory,
+        Duration checkInterval) {
         this.services = services;
         this.policies = policies;
-        this.a1Client = a1Client;
+        this.a1ClientFactory = a1ClientFactory;
+        this.checkInterval = checkInterval;
+        start();
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
-    public void checkAllServices() {
+    private void start() {
         logger.debug("Checking services starting");
-        createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete);
+        createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated"));
     }
 
-    private void onPolicyDeleted(Policy policy) {
-        logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName());
+    private Flux<?> createTask() {
+        return Flux.interval(this.checkInterval) //
+            .flatMap(notUsed -> checkAllServices());
     }
 
-    private void onError(Throwable t) {
-        logger.error("Service supervision failed", t);
-    }
-
-    private void onComplete() {
-        logger.debug("Checking services completed");
+    Flux<Policy> checkAllServices() {
+        return Flux.fromIterable(services.getAll()) //
+            .filter(Service::isExpired) //
+            .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+            .doOnNext(service -> services.remove(service.getName())) //
+            .flatMap(this::getAllPoliciesForService) //
+            .flatMap(this::deletePolicy, CONCURRENCY_RIC);
     }
 
-    private Flux<Policy> createTask() {
-        return Flux.fromIterable(services.getAll()) //
-            .filter(service -> service.isExpired()) //
-            .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
-            .flatMap(service -> getAllPolicies(service)) //
-            .doOnNext(policy -> this.policies.remove(policy)) //
-            .flatMap(policy -> deletePolicyInRic(policy));
+    @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+    private Flux<Policy> deletePolicy(Policy policy) {
+        Lock lock = policy.ric().getLock();
+        return lock.lock(LockType.SHARED) //
+            .doOnNext(notUsed -> policies.remove(policy)) //
+            .flatMap(notUsed -> deletePolicyInRic(policy))
+            .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+                policy.ownerServiceName())) //
+            .doOnNext(notUsed -> lock.unlockBlocking()) //
+            .doOnError(throwable -> lock.unlockBlocking()) //
+            .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+                throwable.getMessage())) //
+            .flatMapMany(notUsed -> Flux.just(policy)) //
+            .onErrorResume(throwable -> Flux.empty());
     }
 
-    private Flux<Policy> getAllPolicies(Service service) {
+    private Flux<Policy> getAllPoliciesForService(Service service) {
         return Flux.fromIterable(policies.getForService(service.getName()));
     }
 
     private Mono<Policy> deletePolicyInRic(Policy policy) {
-        return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), policy.id()) //
-            .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
-            .map((nothing) -> policy);
+        return a1ClientFactory.createA1Client(policy.ric()) //
+            .flatMap(client -> client.deletePolicy(policy) //
+                .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
+                .map(nothing -> policy));
     }
 
     private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
-        logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
+        logger.warn("Could not delete policy: {} from ric: {}. Cause: {}", policy.id(), policy.ric().name(),
+            e.getMessage());
         return Mono.empty();
     }
 }