Merge "Dockerize the test enviroment"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / ServiceSupervision.java
index acb546b..d75ff4f 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent.tasks;
 
+import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.Service;
@@ -30,7 +31,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 @Component
 @EnableScheduling
@@ -38,11 +41,13 @@ public class ServiceSupervision {
     private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
     private final Services services;
     private final Policies policies;
+    private A1ClientFactory a1ClientFactory;
 
     @Autowired
-    public ServiceSupervision(Services services, Policies policies) {
+    public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
         this.services = services;
         this.policies = policies;
+        this.a1ClientFactory = a1ClientFactory;
     }
 
     @Scheduled(fixedRate = 1000 * 60)
@@ -63,16 +68,32 @@ public class ServiceSupervision {
         logger.debug("Checking services completed");
     }
 
-    Flux<Policy> createTask() {
-        return Flux.fromIterable(services.getAll()) //
-            .filter(service -> service.isExpired()) //
-            .doOnNext(service -> logger.info("Service is expired:" + service.getName()))
-            .flatMap(service -> getAllPolicies(service)) //
-            .doOnNext(policy -> this.policies.remove(policy));
+    private Flux<Policy> createTask() {
+        synchronized (services) {
+            return Flux.fromIterable(services.getAll()) //
+                .filter(service -> service.isExpired()) //
+                .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
+                .flatMap(service -> getAllPolicies(service)) //
+                .doOnNext(policy -> this.policies.remove(policy)) //
+                .flatMap(policy -> deletePolicyInRic(policy));
+        }
+    }
+
+    private Flux<Policy> getAllPolicies(Service service) {
+        synchronized (policies) {
+            return Flux.fromIterable(policies.getForService(service.getName()));
+        }
     }
 
-    Flux<Policy> getAllPolicies(Service service) {
-        return Flux.fromIterable(policies.getForService(service.getName()));
+    private Mono<Policy> deletePolicyInRic(Policy policy) {
+        return a1ClientFactory.createA1Client(policy.ric()) //
+            .flatMap(client -> client.deletePolicy(policy.id()) //
+                .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
+                .map((nothing) -> policy));
     }
 
+    private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
+        logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
+        return Mono.empty();
+    }
 }