Merge "Added STD sim 2.0.0 tests"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicSynchronizationTask.java
index 6110c58..6ae55c4 100644 (file)
@@ -22,13 +22,10 @@ package org.oransc.policyagent.tasks;
 
 import static org.oransc.policyagent.repository.Ric.RicState;
 
-import java.util.Vector;
-
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.Lock;
 import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
@@ -40,8 +37,10 @@ import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.BaseSubscriber;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
 
 /**
  * Synchronizes the content of a RIC with the content in the repository. This
@@ -59,6 +58,7 @@ import reactor.core.publisher.Mono;
 public class RicSynchronizationTask {
 
     private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
 
     private final A1ClientFactory a1ClientFactory;
     private final PolicyTypes policyTypes;
@@ -73,60 +73,81 @@ public class RicSynchronizationTask {
         this.services = services;
     }
 
-    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
     public void run(Ric ric) {
         logger.debug("Handling ric: {}", ric.getConfig().name());
 
+        if (ric.getState() == RicState.SYNCHRONIZING) {
+            logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+            return;
+        }
+
+        ric.getLock().lock(LockType.EXCLUSIVE) //
+            .flatMap(notUsed -> setRicState(ric)) //
+            .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+            .flatMapMany(client -> runSynchronization(ric, client)) //
+            .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+            .subscribe(new BaseSubscriber<Object>() {
+                @Override
+                protected void hookOnError(Throwable throwable) {
+                    logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+                    ric.setState(RicState.UNAVAILABLE);
+                }
+
+                @Override
+                protected void hookOnComplete() {
+                    onSynchronizationComplete(ric);
+                }
+
+                @Override
+                protected void hookFinally(SignalType type) {
+                    ric.getLock().unlockBlocking();
+                }
+            });
+    }
+
+    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+    private Mono<Ric> setRicState(Ric ric) {
         synchronized (ric) {
             if (ric.getState() == RicState.SYNCHRONIZING) {
                 logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
-                return;
+                return Mono.empty();
             }
             ric.setState(RicState.SYNCHRONIZING);
+            return Mono.just(ric);
         }
-
-        ric.getLock().lock(LockType.EXCLUSIVE) // Make sure no NBI updates are running
-            .flatMap(Lock::unlock) //
-            .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
-            .flatMapMany(client -> startSynchronization(ric, client)) //
-            .subscribe(x -> logger.debug("Synchronize: {}", x), //
-                throwable -> onSynchronizationError(ric, throwable), //
-                () -> onSynchronizationComplete(ric));
     }
 
-    private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
+    private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
         Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
         Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
-        Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+        Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
 
         return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
     }
 
     private void onSynchronizationComplete(Ric ric) {
-        logger.info("Synchronization completed for: {}", ric.name());
-        ric.setState(RicState.IDLE);
+        logger.debug("Synchronization completed for: {}", ric.name());
+        ric.setState(RicState.AVAILABLE);
         notifyAllServices("Synchronization completed for:" + ric.name());
     }
 
     private void notifyAllServices(String body) {
-        synchronized (services) {
-            for (Service service : services.getAll()) {
-                String url = service.getCallbackUrl();
-                if (service.getCallbackUrl().length() > 0) {
-                    createNotificationClient(url) //
-                        .put("", body) //
-                        .subscribe(
-                            notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
-                                .warn("Service notification failed for service: {}", service.getName(), throwable),
-                            () -> logger.debug("All services notified"));
-                }
+        for (Service service : services.getAll()) {
+            String url = service.getCallbackUrl();
+            if (url.length() > 0) {
+                createNotificationClient(url) //
+                    .put("", body) //
+                    .subscribe( //
+                        notUsed -> logger.debug("Service {} notified", service.getName()),
+                        throwable -> logger.warn("Service notification failed for service: {}. Cause: {}",
+                            service.getName(), throwable.getMessage()),
+                        () -> logger.debug("All services notified"));
             }
         }
     }
 
-    private void onSynchronizationError(Ric ric, Throwable t) {
-        logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
-        // If synchronization fails, try to remove all instances
+    private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
+        logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage());
         deleteAllPoliciesInRepository(ric);
 
         Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
@@ -135,19 +156,11 @@ public class RicSynchronizationTask {
             .flatMapMany(A1Client::deleteAllPolicies) //
             .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
 
-        Flux.concat(synchronizedTypes, deletePoliciesInRic) //
-            .subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
-                throwable -> onSynchronizationRecoveryError(ric, throwable), //
-                () -> onSynchronizationComplete(ric));
-    }
-
-    private void onSynchronizationRecoveryError(Ric ric, Throwable t) {
-        logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
-        ric.setState(RicState.UNDEFINED);
+        return Flux.concat(synchronizedTypes, deletePoliciesInRic);
     }
 
     AsyncRestClient createNotificationClient(final String url) {
-        return new AsyncRestClient(url);
+        return new AsyncRestClient(url, this.a1ClientFactory.getAppConfig().getWebClientConfig());
     }
 
     private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
@@ -155,7 +168,7 @@ public class RicSynchronizationTask {
             .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
             .flatMapMany(Flux::fromIterable) //
             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
-            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
+            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
             .doOnNext(ric::addSupportedPolicyType); //
     }
 
@@ -174,20 +187,28 @@ public class RicSynchronizationTask {
     }
 
     private void deleteAllPoliciesInRepository(Ric ric) {
-        synchronized (policies) {
-            for (Policy policy : policies.getForRic(ric.name())) {
-                this.policies.remove(policy);
-            }
+        for (Policy policy : policies.getForRic(ric.name())) {
+            this.policies.remove(policy);
         }
     }
 
-    private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
-        synchronized (policies) {
-            return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
-                .doOnNext(
-                    policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
-                .flatMap(a1Client::putPolicy);
+    private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
+        logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name());
+        return a1Client.putPolicy(policy) //
+            .flatMapMany(notUsed -> Flux.just(policy));
+    }
+
+    private boolean checkTransient(Policy policy) {
+        if (policy.isTransient()) {
+            this.policies.remove(policy);
         }
+        return policy.isTransient();
+    }
+
+    private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
+        return Flux.fromIterable(policies.getForRic(ric.name())) //
+            .filter(policy -> !checkTransient(policy)) //
+            .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
     }
 
 }