Merge "Added STD sim 2.0.0 tests"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicSynchronizationTask.java
index d759991..6ae55c4 100644 (file)
@@ -22,8 +22,6 @@ package org.oransc.policyagent.tasks;
 
 import static org.oransc.policyagent.repository.Ric.RicState;
 
-import java.util.Vector;
-
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.clients.AsyncRestClient;
@@ -39,8 +37,10 @@ import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.BaseSubscriber;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
 
 /**
  * Synchronizes the content of a RIC with the content in the repository. This
@@ -54,9 +54,11 @@ import reactor.core.publisher.Mono;
  * <p>
  * Notify subscribing services
  */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class RicSynchronizationTask {
 
     private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
 
     private final A1ClientFactory a1ClientFactory;
     private final PolicyTypes policyTypes;
@@ -71,83 +73,94 @@ public class RicSynchronizationTask {
         this.services = services;
     }
 
-    @SuppressWarnings("squid:S2629")
     public void run(Ric ric) {
         logger.debug("Handling ric: {}", ric.getConfig().name());
 
+        if (ric.getState() == RicState.SYNCHRONIZING) {
+            logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+            return;
+        }
+
+        ric.getLock().lock(LockType.EXCLUSIVE) //
+            .flatMap(notUsed -> setRicState(ric)) //
+            .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+            .flatMapMany(client -> runSynchronization(ric, client)) //
+            .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+            .subscribe(new BaseSubscriber<Object>() {
+                @Override
+                protected void hookOnError(Throwable throwable) {
+                    logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+                    ric.setState(RicState.UNAVAILABLE);
+                }
+
+                @Override
+                protected void hookOnComplete() {
+                    onSynchronizationComplete(ric);
+                }
+
+                @Override
+                protected void hookFinally(SignalType type) {
+                    ric.getLock().unlockBlocking();
+                }
+            });
+    }
+
+    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+    private Mono<Ric> setRicState(Ric ric) {
         synchronized (ric) {
             if (ric.getState() == RicState.SYNCHRONIZING) {
                 logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
-                return;
+                return Mono.empty();
             }
             ric.setState(RicState.SYNCHRONIZING);
+            return Mono.just(ric);
         }
-        ric.getLock().lockBlocking(LockType.EXCLUSIVE); // Make sure no NBI updates are running
-        ric.getLock().unlock();
-        this.a1ClientFactory.createA1Client(ric)//
-            .flatMapMany(client -> startSynchronization(ric, client)) //
-            .subscribe(x -> logger.debug("Synchronize: {}", x), //
-                throwable -> onSynchronizationError(ric, throwable), //
-                () -> onSynchronizationComplete(ric));
     }
 
-    private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
-        Flux<PolicyType> recoverTypes = synchronizePolicyTypes(ric, a1Client);
+    private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
+        Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
         Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
-        Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+        Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
 
-        return Flux.concat(recoverTypes, policiesDeletedInRic, policiesRecreatedInRic);
+        return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
     }
 
-    @SuppressWarnings("squid:S2629")
     private void onSynchronizationComplete(Ric ric) {
         logger.debug("Synchronization completed for: {}", ric.name());
-        ric.setState(RicState.IDLE);
+        ric.setState(RicState.AVAILABLE);
         notifyAllServices("Synchronization completed for:" + ric.name());
     }
 
     private void notifyAllServices(String body) {
-        synchronized (services) {
-            for (Service service : services.getAll()) {
-                String url = service.getCallbackUrl();
-                if (service.getCallbackUrl().length() > 0) {
-                    createNotificationClient(url) //
-                        .put("", body) //
-                        .subscribe(
-                            notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
-                                .warn("Service notification failed for service: {}", service.getName(), throwable),
-                            () -> logger.debug("All services notified"));
-                }
+        for (Service service : services.getAll()) {
+            String url = service.getCallbackUrl();
+            if (url.length() > 0) {
+                createNotificationClient(url) //
+                    .put("", body) //
+                    .subscribe( //
+                        notUsed -> logger.debug("Service {} notified", service.getName()),
+                        throwable -> logger.warn("Service notification failed for service: {}. Cause: {}",
+                            service.getName(), throwable.getMessage()),
+                        () -> logger.debug("All services notified"));
             }
         }
     }
 
-    @SuppressWarnings("squid:S2629")
-    private void onSynchronizationError(Ric ric, Throwable t) {
-        logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
-        // If recovery fails, try to remove all instances
+    private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
+        logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage());
         deleteAllPoliciesInRepository(ric);
 
-        Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
+        Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
             .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
         Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
-            .flatMapMany(a1Client -> a1Client.deleteAllPolicies()) //
+            .flatMapMany(A1Client::deleteAllPolicies) //
             .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
 
-        Flux.concat(recoverTypes, deletePoliciesInRic) //
-            .subscribe(x -> logger.debug("Brute recover: " + x), //
-                throwable -> onRecoveryError(ric, throwable), //
-                () -> onSynchronizationComplete(ric));
-    }
-
-    @SuppressWarnings("squid:S2629")
-    private void onRecoveryError(Ric ric, Throwable t) {
-        logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
-        ric.setState(RicState.UNDEFINED);
+        return Flux.concat(synchronizedTypes, deletePoliciesInRic);
     }
 
     AsyncRestClient createNotificationClient(final String url) {
-        return new AsyncRestClient(url);
+        return new AsyncRestClient(url, this.a1ClientFactory.getAppConfig().getWebClientConfig());
     }
 
     private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
@@ -155,7 +168,7 @@ public class RicSynchronizationTask {
             .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
             .flatMapMany(Flux::fromIterable) //
             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
-            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
+            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
             .doOnNext(ric::addSupportedPolicyType); //
     }
 
@@ -174,20 +187,28 @@ public class RicSynchronizationTask {
     }
 
     private void deleteAllPoliciesInRepository(Ric ric) {
-        synchronized (policies) {
-            for (Policy policy : policies.getForRic(ric.name())) {
-                this.policies.remove(policy);
-            }
+        for (Policy policy : policies.getForRic(ric.name())) {
+            this.policies.remove(policy);
         }
     }
 
-    private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
-        synchronized (policies) {
-            return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
-                .doOnNext(
-                    policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
-                .flatMap(a1Client::putPolicy);
+    private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
+        logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name());
+        return a1Client.putPolicy(policy) //
+            .flatMapMany(notUsed -> Flux.just(policy));
+    }
+
+    private boolean checkTransient(Policy policy) {
+        if (policy.isTransient()) {
+            this.policies.remove(policy);
         }
+        return policy.isTransient();
+    }
+
+    private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
+        return Flux.fromIterable(policies.getForRic(ric.name())) //
+            .filter(policy -> !checkTransient(policy)) //
+            .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
     }
 
 }