Merge "Remove unused exceptions from dashboard backend"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / ServiceSupervision.java
index d75ff4f..626a9b6 100644 (file)
@@ -35,6 +35,11 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
+ * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
+ * removed from the repository. This means that the service needs to register again after this.
+ */
 @Component
 @EnableScheduling
 public class ServiceSupervision {
@@ -53,15 +58,12 @@ public class ServiceSupervision {
     @Scheduled(fixedRate = 1000 * 60)
     public void checkAllServices() {
         logger.debug("Checking services starting");
-        createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete);
+        createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
     }
 
+    @SuppressWarnings("squid:S2629")
     private void onPolicyDeleted(Policy policy) {
-        logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName());
-    }
-
-    private void onError(Throwable t) {
-        logger.error("Service supervision failed", t);
+        logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
     }
 
     private void onComplete() {
@@ -71,15 +73,16 @@ public class ServiceSupervision {
     private Flux<Policy> createTask() {
         synchronized (services) {
             return Flux.fromIterable(services.getAll()) //
-                .filter(service -> service.isExpired()) //
-                .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
-                .flatMap(service -> getAllPolicies(service)) //
-                .doOnNext(policy -> this.policies.remove(policy)) //
-                .flatMap(policy -> deletePolicyInRic(policy));
+                .filter(Service::isExpired) //
+                .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+                .doOnNext(service -> services.remove(service.getName())) //
+                .flatMap(this::getAllPoliciesForService) //
+                .doOnNext(policies::remove) //
+                .flatMap(this::deletePolicyInRic);
         }
     }
 
-    private Flux<Policy> getAllPolicies(Service service) {
+    private Flux<Policy> getAllPoliciesForService(Service service) {
         synchronized (policies) {
             return Flux.fromIterable(policies.getForService(service.getName()));
         }
@@ -87,11 +90,12 @@ public class ServiceSupervision {
 
     private Mono<Policy> deletePolicyInRic(Policy policy) {
         return a1ClientFactory.createA1Client(policy.ric()) //
-            .flatMap(client -> client.deletePolicy(policy.id()) //
+            .flatMap(client -> client.deletePolicy(policy) //
                 .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
-                .map((nothing) -> policy));
+                .map(nothing -> policy));
     }
 
+    @SuppressWarnings("squid:S2629")
     private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
         logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
         return Mono.empty();