Concurrency improvements 97/3097/3
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 1 Apr 2020 13:43:03 +0000 (15:43 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 2 Apr 2020 06:08:01 +0000 (08:08 +0200)
The test for concurrency is improved so it involves RIC
synchronizations.

The principles for synchronization is simplified so that
classes in repository always returns copies of collections.

RIC syncronization is taking an exclusive lock during syncronization,
which leads to that a client will not get an HTTP error when accessing
a RIC that is synched. Instead, the client will be kept waiting until
the synch is completed.

Change-Id: I67568e8ef63b4b559a341ed8136e41119c9b7e6b
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
16 files changed:
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java [new file with mode: 0644]
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java

index 0f3b391..5ae54ef 100644 (file)
@@ -95,17 +95,15 @@ public class PolicyController {
             @ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List"), //
             @ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
     public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
-        synchronized (this.policyTypes) {
-            if (ricName == null) {
-                Collection<PolicyType> types = this.policyTypes.getAll();
+        if (ricName == null) {
+            Collection<PolicyType> types = this.policyTypes.getAll();
+            return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+        } else {
+            try {
+                Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
                 return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
-            } else {
-                try {
-                    Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
-                    return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
-                } catch (ServiceException e) {
-                    return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
-                }
+            } catch (ServiceException e) {
+                return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
             }
         }
     }
@@ -136,17 +134,15 @@ public class PolicyController {
                 responseContainer = "List"),
             @ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
     public ResponseEntity<String> getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) {
-        synchronized (this.policyTypes) {
-            if (ricName == null) {
-                Collection<PolicyType> types = this.policyTypes.getAll();
+        if (ricName == null) {
+            Collection<PolicyType> types = this.policyTypes.getAll();
+            return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+        } else {
+            try {
+                Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
                 return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
-            } else {
-                try {
-                    Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
-                    return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
-                } catch (ServiceException e) {
-                    return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
-                }
+            } catch (ServiceException e) {
+                return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
             }
         }
     }
@@ -174,19 +170,16 @@ public class PolicyController {
         value = { //
             @ApiResponse(code = 204, message = "Policy deleted", response = Object.class),
             @ApiResponse(code = 404, message = "Policy is not found", response = String.class),
-            @ApiResponse(code = 423, message = "RIC is locked", response = String.class)})
+            @ApiResponse(code = 423, message = "RIC is not operational", response = String.class)})
     public Mono<ResponseEntity<Object>> deletePolicy( //
         @RequestParam(name = "instance", required = true) String id) {
-        Policy policy;
         try {
-            policy = policies.getPolicy(id);
+            Policy policy = policies.getPolicy(id);
             keepServiceAlive(policy.ownerServiceName());
-            if (policy.ric().getState() != Ric.RicState.IDLE) {
-                return Mono.just(new ResponseEntity<>("Busy, synchronizing", HttpStatus.LOCKED));
-            }
             Ric ric = policy.ric();
-            return ric.getLock().lock(LockType.SHARED) // //
-                .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
+            return ric.getLock().lock(LockType.SHARED) //
+                .flatMap(notUsed -> assertRicStateIdle(ric)) //
+                .flatMap(notUsed -> a1ClientFactory.createA1Client(policy.ric())) //
                 .doOnNext(notUsed -> policies.remove(policy)) //
                 .flatMap(client -> client.deletePolicy(policy)) //
                 .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
@@ -204,7 +197,7 @@ public class PolicyController {
         value = { //
             @ApiResponse(code = 201, message = "Policy created", response = Object.class), //
             @ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
-            @ApiResponse(code = 423, message = "RIC is locked", response = String.class), //
+            @ApiResponse(code = 423, message = "RIC is not operational", response = String.class), //
             @ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class) //
         })
     public Mono<ResponseEntity<Object>> putPolicy( //
@@ -218,31 +211,30 @@ public class PolicyController {
         Ric ric = rics.get(ricName);
         PolicyType type = policyTypes.get(typeName);
         keepServiceAlive(service);
-        if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) {
-            Policy policy = ImmutablePolicy.builder() //
-                .id(instanceId) //
-                .json(jsonString) //
-                .type(type) //
-                .ric(ric) //
-                .ownerServiceName(service) //
-                .lastModified(getTimeStampUtc()) //
-                .build();
-
-            final boolean isCreate = this.policies.get(policy.id()) == null;
-
-            return ric.getLock().lock(LockType.SHARED) //
-                .flatMap(p -> validateModifiedPolicy(policy)) //
-                .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
-                .flatMap(client -> client.putPolicy(policy)) //
-                .doOnNext(notUsed -> policies.put(policy)) //
-                .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
-                .doOnError(t -> ric.getLock().unlockBlocking()) //
-                .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
-                .onErrorResume(this::handleException);
+        if (ric == null || type == null) {
+            return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
         }
-
-        return ric == null || type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND))
-            : Mono.just(new ResponseEntity<>(HttpStatus.LOCKED)); // Synchronizing
+        Policy policy = ImmutablePolicy.builder() //
+            .id(instanceId) //
+            .json(jsonString) //
+            .type(type) //
+            .ric(ric) //
+            .ownerServiceName(service) //
+            .lastModified(getTimeStampUtc()) //
+            .build();
+
+        final boolean isCreate = this.policies.get(policy.id()) == null;
+
+        return ric.getLock().lock(LockType.SHARED) //
+            .flatMap(p -> assertRicStateIdle(ric)) //
+            .flatMap(p -> validateModifiedPolicy(policy)) //
+            .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
+            .flatMap(client -> client.putPolicy(policy)) //
+            .doOnNext(notUsed -> policies.put(policy)) //
+            .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+            .doOnError(t -> ric.getLock().unlockBlocking()) //
+            .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+            .onErrorResume(this::handleException);
     }
 
     @SuppressWarnings({"unchecked"})
@@ -275,6 +267,16 @@ public class PolicyController {
         return Mono.just("OK");
     }
 
+    private Mono<Object> assertRicStateIdle(Ric ric) {
+        if (ric.getState() == Ric.RicState.IDLE) {
+            return Mono.just("OK");
+        } else {
+            RejectionException e = new RejectionException(
+                "Ric is not operational, RIC name: " + ric.name() + ", state: " + ric.getState(), HttpStatus.LOCKED);
+            return Mono.error(e);
+        }
+    }
+
     @GetMapping("/policies")
     @ApiOperation(value = "Query policies")
     @ApiResponses(
@@ -292,10 +294,9 @@ public class PolicyController {
         if ((ric != null && this.rics.get(ric) == null)) {
             return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
         }
-        synchronized (policies) {
-            String filteredPolicies = policiesToJson(filter(type, ric, service));
-            return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
-        }
+
+        String filteredPolicies = policiesToJson(filter(type, ric, service));
+        return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
     }
 
     @GetMapping("/policy_ids")
@@ -314,10 +315,9 @@ public class PolicyController {
         if ((ric != null && this.rics.get(ric) == null)) {
             return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
         }
-        synchronized (policies) {
-            String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
-            return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
-        }
+
+        String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
+        return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
     }
 
     @GetMapping("/policy_status")
@@ -367,16 +367,14 @@ public class PolicyController {
     }
 
     private Collection<Policy> filter(String type, String ric, String service) {
-        synchronized (policies) {
-            if (type != null) {
-                return filter(policies.getForType(type), null, ric, service);
-            } else if (service != null) {
-                return filter(policies.getForService(service), type, ric, null);
-            } else if (ric != null) {
-                return filter(policies.getForRic(ric), type, null, service);
-            } else {
-                return policies.getAll();
-            }
+        if (type != null) {
+            return filter(policies.getForType(type), null, ric, service);
+        } else if (service != null) {
+            return filter(policies.getForService(service), type, ric, null);
+        } else if (ric != null) {
+            return filter(policies.getForRic(ric), type, null, service);
+        } else {
+            return policies.getAll();
         }
     }
 
index a96766d..465ee78 100644 (file)
@@ -95,11 +95,9 @@ public class RicRepositoryController {
         }
 
         List<RicInfo> result = new ArrayList<>();
-        synchronized (rics) {
-            for (Ric ric : rics.getRics()) {
-                if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
-                    result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames()));
-                }
+        for (Ric ric : rics.getRics()) {
+            if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
+                result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames()));
             }
         }
 
index 2a34643..578e5c8 100644 (file)
@@ -31,7 +31,6 @@ import io.swagger.annotations.ApiResponses;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.Policies;
@@ -79,11 +78,9 @@ public class ServiceController {
         }
 
         Collection<ServiceStatus> servicesStatus = new ArrayList<>();
-        synchronized (this.services) {
-            for (Service s : this.services.getAll()) {
-                if (name == null || name.equals(s.getName())) {
-                    servicesStatus.add(toServiceStatus(s));
-                }
+        for (Service s : this.services.getAll()) {
+            if (name == null || name.equals(s.getName())) {
+                servicesStatus.add(toServiceStatus(s));
             }
         }
 
@@ -157,19 +154,15 @@ public class ServiceController {
     }
 
     private Service removeService(String name) throws ServiceException {
-        synchronized (this.services) {
-            Service service = this.services.getService(name);
-            this.services.remove(service.getName());
-            return service;
-        }
+        Service service = this.services.getService(name); // Just to verify that it exists
+        this.services.remove(service.getName());
+        return service;
     }
 
     private void removePolicies(Service service) {
-        synchronized (this.policies) {
-            List<Policy> policyList = new ArrayList<>(this.policies.getForService(service.getName()));
-            for (Policy policy : policyList) {
-                this.policies.remove(policy);
-            }
+        Collection<Policy> policyList = this.policies.getForService(service.getName());
+        for (Policy policy : policyList) {
+            this.policies.remove(policy);
         }
     }
 
index 62dd140..4e2ebfa 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Vector;
 
 import org.oransc.policyagent.exceptions.ServiceException;
 
@@ -60,7 +61,7 @@ public class Policies {
         if (map == null) {
             return Collections.emptyList();
         }
-        return Collections.unmodifiableCollection(map.values());
+        return new Vector<>(map.values());
     }
 
     public synchronized boolean containsPolicy(String id) {
@@ -80,7 +81,7 @@ public class Policies {
     }
 
     public synchronized Collection<Policy> getAll() {
-        return Collections.unmodifiableCollection(policiesId.values());
+        return new Vector<>(policiesId.values());
     }
 
     public synchronized Collection<Policy> getForService(String service) {
index 7798231..2897a50 100644 (file)
@@ -21,9 +21,9 @@
 package org.oransc.policyagent.repository;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Vector;
 
 import org.oransc.policyagent.exceptions.ServiceException;
 
@@ -51,7 +51,7 @@ public class PolicyTypes {
     }
 
     public synchronized Collection<PolicyType> getAll() {
-        return Collections.unmodifiableCollection(types.values());
+        return new Vector<>(types.values());
     }
 
     public synchronized int size() {
index 3b8e587..66cecd3 100644 (file)
@@ -23,6 +23,7 @@ package org.oransc.policyagent.repository;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Vector;
 
 import org.oransc.policyagent.exceptions.ServiceException;
 
@@ -37,7 +38,7 @@ public class Rics {
     }
 
     public synchronized Iterable<Ric> getRics() {
-        return registeredRics.values();
+        return new Vector<>(registeredRics.values());
     }
 
     public synchronized Ric getRic(String name) throws ServiceException {
index f6c55dc..f829c7c 100644 (file)
@@ -20,9 +20,9 @@
 
 package org.oransc.policyagent.repository;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Vector;
 
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.slf4j.Logger;
@@ -51,7 +51,7 @@ public class Services {
     }
 
     public synchronized Iterable<Service> getAll() {
-        return Collections.unmodifiableCollection(registeredServices.values());
+        return new Vector<>(registeredServices.values());
     }
 
     public synchronized void remove(String name) {
index 52780d7..ba050df 100644 (file)
@@ -82,12 +82,11 @@ public class RicSupervision {
     }
 
     private Flux<RicData> createTask() {
-        synchronized (this.rics) {
-            return Flux.fromIterable(rics.getRics()) //
-                .flatMap(this::createRicData) //
-                .flatMap(this::checkOneRic) //
-                .onErrorResume(throwable -> Mono.empty());
-        }
+        return Flux.fromIterable(rics.getRics()) //
+            .flatMap(this::createRicData) //
+            .flatMap(this::checkOneRic) //
+            .onErrorResume(throwable -> Mono.empty());
+
     }
 
     private Mono<RicData> checkOneRic(RicData ricData) {
index 3879fd6..00ca0ed 100644 (file)
@@ -22,15 +22,10 @@ package org.oransc.policyagent.tasks;
 
 import static org.oransc.policyagent.repository.Ric.RicState;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.Lock;
 import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
@@ -45,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import reactor.core.publisher.BaseSubscriber;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
 
 /**
  * Synchronizes the content of a RIC with the content in the repository. This
@@ -76,36 +72,51 @@ public class RicSynchronizationTask {
         this.services = services;
     }
 
-    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
     public void run(Ric ric) {
         logger.debug("Handling ric: {}", ric.getConfig().name());
 
-        synchronized (ric) {
-            if (ric.getState() == RicState.SYNCHRONIZING) {
-                logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
-                return;
-            }
-            ric.setState(RicState.SYNCHRONIZING);
+        if (ric.getState() == RicState.SYNCHRONIZING) {
+            logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+            return;
         }
 
-        ric.getLock().lock(LockType.EXCLUSIVE) // Make sure no NBI updates are running
-            .flatMap(Lock::unlock) //
+        ric.getLock().lock(LockType.EXCLUSIVE) //
+            .flatMap(notUsed -> setRicState(ric)) //
             .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
-            .flatMapMany(client -> startSynchronization(ric, client)) //
+            .flatMapMany(client -> runSynchronization(ric, client)) //
+            .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
             .subscribe(new BaseSubscriber<Object>() {
                 @Override
                 protected void hookOnError(Throwable throwable) {
-                    startDeleteAllPolicyInstances(ric, throwable);
+                    logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+                    ric.setState(RicState.UNDEFINED);
                 }
 
                 @Override
                 protected void hookOnComplete() {
                     onSynchronizationComplete(ric);
                 }
+
+                @Override
+                protected void hookFinally(SignalType type) {
+                    ric.getLock().unlockBlocking();
+                }
             });
     }
 
-    private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
+    @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+    private Mono<Ric> setRicState(Ric ric) {
+        synchronized (ric) {
+            if (ric.getState() == RicState.SYNCHRONIZING) {
+                logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+                return Mono.empty();
+            }
+            ric.setState(RicState.SYNCHRONIZING);
+            return Mono.just(ric);
+        }
+    }
+
+    private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
         Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
         Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
         Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
@@ -114,30 +125,27 @@ public class RicSynchronizationTask {
     }
 
     private void onSynchronizationComplete(Ric ric) {
-        logger.info("Synchronization completed for: {}", ric.name());
+        logger.debug("Synchronization completed for: {}", ric.name());
         ric.setState(RicState.IDLE);
         notifyAllServices("Synchronization completed for:" + ric.name());
     }
 
     private void notifyAllServices(String body) {
-        synchronized (services) {
-            for (Service service : services.getAll()) {
-                String url = service.getCallbackUrl();
-                if (service.getCallbackUrl().length() > 0) {
-                    createNotificationClient(url) //
-                        .put("", body) //
-                        .subscribe( //
-                            notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
-                                .warn("Service notification failed for service: {}", service.getName(), throwable),
-                            () -> logger.debug("All services notified"));
-                }
+        for (Service service : services.getAll()) {
+            String url = service.getCallbackUrl();
+            if (service.getCallbackUrl().length() > 0) {
+                createNotificationClient(url) //
+                    .put("", body) //
+                    .subscribe( //
+                        notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
+                            .warn("Service notification failed for service: {}", service.getName(), throwable),
+                        () -> logger.debug("All services notified"));
             }
         }
     }
 
-    private void startDeleteAllPolicyInstances(Ric ric, Throwable t) {
-        logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
-        // If synchronization fails, try to remove all instances
+    private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
+        logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage());
         deleteAllPoliciesInRepository(ric);
 
         Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
@@ -146,15 +154,7 @@ public class RicSynchronizationTask {
             .flatMapMany(A1Client::deleteAllPolicies) //
             .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
 
-        Flux.concat(synchronizedTypes, deletePoliciesInRic) //
-            .subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
-                throwable -> onDeleteAllPolicyInstancesError(ric, throwable), //
-                () -> onSynchronizationComplete(ric));
-    }
-
-    private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) {
-        logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
-        ric.setState(RicState.UNDEFINED);
+        return Flux.concat(synchronizedTypes, deletePoliciesInRic);
     }
 
     AsyncRestClient createNotificationClient(final String url) {
@@ -185,11 +185,8 @@ public class RicSynchronizationTask {
     }
 
     private void deleteAllPoliciesInRepository(Ric ric) {
-        synchronized (policies) {
-            List<Policy> ricPolicies = new ArrayList<>(policies.getForRic(ric.name()));
-            for (Policy policy : ricPolicies) {
-                this.policies.remove(policy);
-            }
+        for (Policy policy : policies.getForRic(ric.name())) {
+            this.policies.remove(policy);
         }
     }
 
@@ -200,10 +197,8 @@ public class RicSynchronizationTask {
     }
 
     private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
-        synchronized (policies) {
-            return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
-                .flatMap(policy -> putPolicy(policy, ric, a1Client));
-        }
+        return Flux.fromIterable(policies.getForRic(ric.name())) //
+            .flatMap(policy -> putPolicy(policy, ric, a1Client));
     }
 
 }
index 2650992..50e990c 100644 (file)
@@ -105,9 +105,7 @@ public class ServiceSupervision {
     }
 
     private Flux<Policy> getAllPoliciesForService(Service service) {
-        synchronized (policies) {
-            return Flux.fromIterable(policies.getForService(service.getName()));
-        }
+        return Flux.fromIterable(policies.getForService(service.getName()));
     }
 
     private Mono<Policy> deletePolicyInRic(Policy policy) {
index 71bd9a5..f701ee5 100644 (file)
@@ -38,7 +38,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -76,13 +75,9 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Bean;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.web.client.RestTemplate;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Mono;
@@ -180,6 +175,7 @@ public class ApplicationTest {
         policies.clear();
         policyTypes.clear();
         services.clear();
+        a1ClientFactory.reset();
     }
 
     @AfterEach
@@ -454,7 +450,6 @@ public class ApplicationTest {
 
     @Test
     public void testGetPolicies() throws Exception {
-        reset();
         addPolicy("id1", "type1", "service1");
 
         String url = "/policies";
@@ -642,51 +637,19 @@ public class ApplicationTest {
         return "{\n  \"servingCellNrcgi\": \"1\"\n }";
     }
 
-    private static class ConcurrencyTestRunnable implements Runnable {
-        private final RestTemplate restTemplate = new RestTemplate();
-        private final String baseUrl;
-        static AtomicInteger nextCount = new AtomicInteger(0);
-        private final int count;
-        private final RicSupervision supervision;
-
-        ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) {
-            this.baseUrl = baseUrl;
-            this.count = nextCount.incrementAndGet();
-            this.supervision = supervision;
-        }
-
-        @Override
-        public void run() {
-            for (int i = 0; i < 100; ++i) {
-                if (i % 10 == 0) {
-                    this.supervision.checkAllRics();
-                }
-                String name = "policy:" + count + ":" + i;
-                putPolicy(name);
-                deletePolicy(name);
-            }
-        }
-
-        private void putPolicy(String name) {
-            String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1";
-            restTemplate.put(putUrl, createJsonHttpEntity("{}"));
-        }
-
-        private void deletePolicy(String name) {
-            String deleteUrl = baseUrl + "/policy?instance=" + name;
-            restTemplate.delete(deleteUrl);
-        }
-    }
-
     @Test
     public void testConcurrency() throws Exception {
         final Instant startTime = Instant.now();
         List<Thread> threads = new ArrayList<>();
-        addRic("ric1");
-        addPolicyType("type1", "ric1");
+        a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+        addRic("ric");
+        addPolicyType("type1", "ric");
+        addPolicyType("type2", "ric");
 
         for (int i = 0; i < 100; ++i) {
-            Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i);
+            Thread t =
+                new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
+                    "TestThread_" + i);
             t.start();
             threads.add(t);
         }
@@ -758,12 +721,6 @@ public class ApplicationTest {
         return ric;
     }
 
-    private static HttpEntity<String> createJsonHttpEntity(String content) {
-        HttpHeaders headers = new HttpHeaders();
-        headers.setContentType(MediaType.APPLICATION_JSON);
-        return new HttpEntity<String>(content, headers);
-    }
-
     private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
         List<T> result = new ArrayList<>();
         JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java
new file mode 100644 (file)
index 0000000..26e2091
--- /dev/null
@@ -0,0 +1,133 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.oransc.policyagent.repository.ImmutablePolicy;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.tasks.RicSupervision;
+import org.oransc.policyagent.utils.MockA1Client;
+import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * Invoke operations over the NBI and start synchronizations in a separate
+ * thread. For test of robustness using concurrent clients.
+ */
+class ConcurrencyTestRunnable implements Runnable {
+    private static final Logger logger = LoggerFactory.getLogger(ConcurrencyTestRunnable.class);
+    private final RestTemplate restTemplate = new RestTemplate();
+    private final String baseUrl;
+    static AtomicInteger nextCount = new AtomicInteger(0);
+    private final int count;
+    private final RicSupervision supervision;
+    private final MockA1ClientFactory a1ClientFactory;
+    private final Rics rics;
+    private final PolicyTypes types;
+
+    ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision, MockA1ClientFactory a1ClientFactory, Rics rics,
+        PolicyTypes types) {
+        this.baseUrl = baseUrl;
+        this.count = nextCount.incrementAndGet();
+        this.supervision = supervision;
+        this.a1ClientFactory = a1ClientFactory;
+        this.rics = rics;
+        this.types = types;
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (int i = 0; i < 100; ++i) {
+                if (i % 10 == 0) {
+                    createInconsistency();
+                    this.supervision.checkAllRics();
+                }
+                String name = "policy:" + count + ":" + i;
+                putPolicy(name);
+                putPolicy(name + "-");
+                listPolicies();
+                listTypes();
+                deletePolicy(name);
+                deletePolicy(name + "-");
+            }
+        } catch (Exception e) {
+            logger.error("Concurrency exception " + e.toString());
+        }
+    }
+
+    private Policy createPolicyObject(String id) {
+        Ric ric = this.rics.get("ric");
+        PolicyType type = this.types.get("type1");
+        return ImmutablePolicy.builder() //
+            .id(id) //
+            .json("{}") //
+            .type(type) //
+            .ric(ric) //
+            .ownerServiceName("") //
+            .lastModified("") //
+            .build();
+    }
+
+    private void createInconsistency() {
+        MockA1Client client = a1ClientFactory.getOrCreateA1Client("ric");
+        Policy policy = createPolicyObject("junk");
+        client.putPolicy(policy).block();
+
+    }
+
+    private void listPolicies() {
+        String uri = baseUrl + "/policies";
+        restTemplate.getForObject(uri, String.class);
+    }
+
+    private void listTypes() {
+        String uri = baseUrl + "/policy_types";
+        restTemplate.getForObject(uri, String.class);
+    }
+
+    private void putPolicy(String name) {
+        String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric&service=service1";
+        restTemplate.put(putUrl, createJsonHttpEntity("{}"));
+    }
+
+    private void deletePolicy(String name) {
+        String deleteUrl = baseUrl + "/policy?instance=" + name;
+        restTemplate.delete(deleteUrl);
+    }
+
+    private static HttpEntity<String> createJsonHttpEntity(String content) {
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        return new HttpEntity<String>(content, headers);
+    }
+
+}
index 481a1fe..2b66a35 100644 (file)
@@ -131,11 +131,9 @@ public class MockPolicyAgent {
 
     private void keepServerAlive() throws InterruptedException {
         logger.info("Keeping server alive!");
-
         synchronized (this) {
             this.wait();
         }
-
     }
 
     private static String title(String jsonSchema) {
index eae00ea..e0ce2ef 100644 (file)
@@ -264,9 +264,7 @@ public class RicSynchronizationTaskTest {
         synchronizerUnderTest.run(RIC_1);
 
         verifyCorrectLogMessage(0, logAppender,
-            "Synchronization failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
-        verifyCorrectLogMessage(1, logAppender,
-            "Synchronization failure recovery failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
+            "Recreation of policies failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
 
         verify(a1ClientMock, times(2)).deleteAllPolicies();
         verifyNoMoreInteractions(a1ClientMock);
index 3265d76..0fa5be4 100644 (file)
@@ -46,25 +46,21 @@ public class MockA1Client implements A1Client {
 
     @Override
     public Mono<List<String>> getPolicyTypeIdentities() {
-        synchronized (this.policyTypes) {
-            List<String> result = new Vector<>();
-            for (PolicyType p : this.policyTypes.getAll()) {
-                result.add(p.name());
-            }
-            return mono(result);
+        List<String> result = new Vector<>();
+        for (PolicyType p : this.policyTypes.getAll()) {
+            result.add(p.name());
         }
+        return mono(result);
     }
 
     @Override
     public Mono<List<String>> getPolicyIdentities() {
-        synchronized (this.policies) {
-            Vector<String> result = new Vector<>();
-            for (Policy policy : policies.getAll()) {
-                result.add(policy.id());
-            }
-
-            return mono(result);
+        Vector<String> result = new Vector<>();
+        for (Policy policy : policies.getAll()) {
+            result.add(policy.id());
         }
+
+        return mono(result);
     }
 
     @Override
index c1fd8c3..9967958 100644 (file)
@@ -72,4 +72,9 @@ public class MockA1ClientFactory extends A1ClientFactory {
         this.asynchDelay = delay;
     }
 
+    public void reset() {
+        this.asynchDelay = Duration.ofSeconds(0);
+        clients.clear();
+    }
+
 }