From: PatrikBuhr Date: Wed, 1 Apr 2020 13:43:03 +0000 (+0200) Subject: Concurrency improvements X-Git-Tag: 2.0.0~95 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=ae4206bbd7437adda91fc429efef03a13da2b702;p=nonrtric.git Concurrency improvements The test for concurrency is improved so it involves RIC synchronizations. The principles for synchronization is simplified so that classes in repository always returns copies of collections. RIC syncronization is taking an exclusive lock during syncronization, which leads to that a client will not get an HTTP error when accessing a RIC that is synched. Instead, the client will be kept waiting until the synch is completed. Change-Id: I67568e8ef63b4b559a341ed8136e41119c9b7e6b Issue-ID: NONRTRIC-164 Signed-off-by: PatrikBuhr --- diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index 0f3b391f..5ae54ef0 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -95,17 +95,15 @@ public class PolicyController { @ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List"), // @ApiResponse(code = 404, message = "RIC is not found", response = String.class)}) public ResponseEntity getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) { - synchronized (this.policyTypes) { - if (ricName == null) { - Collection types = this.policyTypes.getAll(); + if (ricName == null) { + Collection types = this.policyTypes.getAll(); + return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK); + } else { + try { + Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK); - } else { - try { - Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); - return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK); - } catch (ServiceException e) { - return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND); - } + } catch (ServiceException e) { + return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND); } } } @@ -136,17 +134,15 @@ public class PolicyController { responseContainer = "List"), @ApiResponse(code = 404, message = "RIC is not found", response = String.class)}) public ResponseEntity getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) { - synchronized (this.policyTypes) { - if (ricName == null) { - Collection types = this.policyTypes.getAll(); + if (ricName == null) { + Collection types = this.policyTypes.getAll(); + return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK); + } else { + try { + Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK); - } else { - try { - Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); - return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK); - } catch (ServiceException e) { - return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND); - } + } catch (ServiceException e) { + return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND); } } } @@ -174,19 +170,16 @@ public class PolicyController { value = { // @ApiResponse(code = 204, message = "Policy deleted", response = Object.class), @ApiResponse(code = 404, message = "Policy is not found", response = String.class), - @ApiResponse(code = 423, message = "RIC is locked", response = String.class)}) + @ApiResponse(code = 423, message = "RIC is not operational", response = String.class)}) public Mono> deletePolicy( // @RequestParam(name = "instance", required = true) String id) { - Policy policy; try { - policy = policies.getPolicy(id); + Policy policy = policies.getPolicy(id); keepServiceAlive(policy.ownerServiceName()); - if (policy.ric().getState() != Ric.RicState.IDLE) { - return Mono.just(new ResponseEntity<>("Busy, synchronizing", HttpStatus.LOCKED)); - } Ric ric = policy.ric(); - return ric.getLock().lock(LockType.SHARED) // // - .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) // + return ric.getLock().lock(LockType.SHARED) // + .flatMap(notUsed -> assertRicStateIdle(ric)) // + .flatMap(notUsed -> a1ClientFactory.createA1Client(policy.ric())) // .doOnNext(notUsed -> policies.remove(policy)) // .flatMap(client -> client.deletePolicy(policy)) // .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // @@ -204,7 +197,7 @@ public class PolicyController { value = { // @ApiResponse(code = 201, message = "Policy created", response = Object.class), // @ApiResponse(code = 200, message = "Policy updated", response = Object.class), // - @ApiResponse(code = 423, message = "RIC is locked", response = String.class), // + @ApiResponse(code = 423, message = "RIC is not operational", response = String.class), // @ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class) // }) public Mono> putPolicy( // @@ -218,31 +211,30 @@ public class PolicyController { Ric ric = rics.get(ricName); PolicyType type = policyTypes.get(typeName); keepServiceAlive(service); - if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) { - Policy policy = ImmutablePolicy.builder() // - .id(instanceId) // - .json(jsonString) // - .type(type) // - .ric(ric) // - .ownerServiceName(service) // - .lastModified(getTimeStampUtc()) // - .build(); - - final boolean isCreate = this.policies.get(policy.id()) == null; - - return ric.getLock().lock(LockType.SHARED) // - .flatMap(p -> validateModifiedPolicy(policy)) // - .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) // - .flatMap(client -> client.putPolicy(policy)) // - .doOnNext(notUsed -> policies.put(policy)) // - .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // - .doOnError(t -> ric.getLock().unlockBlocking()) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) // - .onErrorResume(this::handleException); + if (ric == null || type == null) { + return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } - - return ric == null || type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)) - : Mono.just(new ResponseEntity<>(HttpStatus.LOCKED)); // Synchronizing + Policy policy = ImmutablePolicy.builder() // + .id(instanceId) // + .json(jsonString) // + .type(type) // + .ric(ric) // + .ownerServiceName(service) // + .lastModified(getTimeStampUtc()) // + .build(); + + final boolean isCreate = this.policies.get(policy.id()) == null; + + return ric.getLock().lock(LockType.SHARED) // + .flatMap(p -> assertRicStateIdle(ric)) // + .flatMap(p -> validateModifiedPolicy(policy)) // + .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) // + .flatMap(client -> client.putPolicy(policy)) // + .doOnNext(notUsed -> policies.put(policy)) // + .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // + .doOnError(t -> ric.getLock().unlockBlocking()) // + .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) // + .onErrorResume(this::handleException); } @SuppressWarnings({"unchecked"}) @@ -275,6 +267,16 @@ public class PolicyController { return Mono.just("OK"); } + private Mono assertRicStateIdle(Ric ric) { + if (ric.getState() == Ric.RicState.IDLE) { + return Mono.just("OK"); + } else { + RejectionException e = new RejectionException( + "Ric is not operational, RIC name: " + ric.name() + ", state: " + ric.getState(), HttpStatus.LOCKED); + return Mono.error(e); + } + } + @GetMapping("/policies") @ApiOperation(value = "Query policies") @ApiResponses( @@ -292,10 +294,9 @@ public class PolicyController { if ((ric != null && this.rics.get(ric) == null)) { return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND); } - synchronized (policies) { - String filteredPolicies = policiesToJson(filter(type, ric, service)); - return new ResponseEntity<>(filteredPolicies, HttpStatus.OK); - } + + String filteredPolicies = policiesToJson(filter(type, ric, service)); + return new ResponseEntity<>(filteredPolicies, HttpStatus.OK); } @GetMapping("/policy_ids") @@ -314,10 +315,9 @@ public class PolicyController { if ((ric != null && this.rics.get(ric) == null)) { return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND); } - synchronized (policies) { - String policyIdsJson = toPolicyIdsJson(filter(type, ric, service)); - return new ResponseEntity<>(policyIdsJson, HttpStatus.OK); - } + + String policyIdsJson = toPolicyIdsJson(filter(type, ric, service)); + return new ResponseEntity<>(policyIdsJson, HttpStatus.OK); } @GetMapping("/policy_status") @@ -367,16 +367,14 @@ public class PolicyController { } private Collection filter(String type, String ric, String service) { - synchronized (policies) { - if (type != null) { - return filter(policies.getForType(type), null, ric, service); - } else if (service != null) { - return filter(policies.getForService(service), type, ric, null); - } else if (ric != null) { - return filter(policies.getForRic(ric), type, null, service); - } else { - return policies.getAll(); - } + if (type != null) { + return filter(policies.getForType(type), null, ric, service); + } else if (service != null) { + return filter(policies.getForService(service), type, ric, null); + } else if (ric != null) { + return filter(policies.getForRic(ric), type, null, service); + } else { + return policies.getAll(); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java index a96766d3..465ee78c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java @@ -95,11 +95,9 @@ public class RicRepositoryController { } List result = new ArrayList<>(); - synchronized (rics) { - for (Ric ric : rics.getRics()) { - if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) { - result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames())); - } + for (Ric ric : rics.getRics()) { + if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) { + result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames())); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java index 2a346431..578e5c80 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java @@ -31,7 +31,6 @@ import io.swagger.annotations.ApiResponses; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.Policies; @@ -79,11 +78,9 @@ public class ServiceController { } Collection servicesStatus = new ArrayList<>(); - synchronized (this.services) { - for (Service s : this.services.getAll()) { - if (name == null || name.equals(s.getName())) { - servicesStatus.add(toServiceStatus(s)); - } + for (Service s : this.services.getAll()) { + if (name == null || name.equals(s.getName())) { + servicesStatus.add(toServiceStatus(s)); } } @@ -157,19 +154,15 @@ public class ServiceController { } private Service removeService(String name) throws ServiceException { - synchronized (this.services) { - Service service = this.services.getService(name); - this.services.remove(service.getName()); - return service; - } + Service service = this.services.getService(name); // Just to verify that it exists + this.services.remove(service.getName()); + return service; } private void removePolicies(Service service) { - synchronized (this.policies) { - List policyList = new ArrayList<>(this.policies.getForService(service.getName())); - for (Policy policy : policyList) { - this.policies.remove(policy); - } + Collection policyList = this.policies.getForService(service.getName()); + for (Policy policy : policyList) { + this.policies.remove(policy); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java index 62dd1407..4e2ebfa0 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.Vector; import org.oransc.policyagent.exceptions.ServiceException; @@ -60,7 +61,7 @@ public class Policies { if (map == null) { return Collections.emptyList(); } - return Collections.unmodifiableCollection(map.values()); + return new Vector<>(map.values()); } public synchronized boolean containsPolicy(String id) { @@ -80,7 +81,7 @@ public class Policies { } public synchronized Collection getAll() { - return Collections.unmodifiableCollection(policiesId.values()); + return new Vector<>(policiesId.values()); } public synchronized Collection getForService(String service) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java index 77982310..2897a502 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java @@ -21,9 +21,9 @@ package org.oransc.policyagent.repository; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Vector; import org.oransc.policyagent.exceptions.ServiceException; @@ -51,7 +51,7 @@ public class PolicyTypes { } public synchronized Collection getAll() { - return Collections.unmodifiableCollection(types.values()); + return new Vector<>(types.values()); } public synchronized int size() { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java index 3b8e587e..66cecd3d 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java @@ -23,6 +23,7 @@ package org.oransc.policyagent.repository; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Vector; import org.oransc.policyagent.exceptions.ServiceException; @@ -37,7 +38,7 @@ public class Rics { } public synchronized Iterable getRics() { - return registeredRics.values(); + return new Vector<>(registeredRics.values()); } public synchronized Ric getRic(String name) throws ServiceException { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java index f6c55dc4..f829c7c8 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java @@ -20,9 +20,9 @@ package org.oransc.policyagent.repository; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Vector; import org.oransc.policyagent.exceptions.ServiceException; import org.slf4j.Logger; @@ -51,7 +51,7 @@ public class Services { } public synchronized Iterable getAll() { - return Collections.unmodifiableCollection(registeredServices.values()); + return new Vector<>(registeredServices.values()); } public synchronized void remove(String name) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java index 52780d7e..ba050df7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java @@ -82,12 +82,11 @@ public class RicSupervision { } private Flux createTask() { - synchronized (this.rics) { - return Flux.fromIterable(rics.getRics()) // - .flatMap(this::createRicData) // - .flatMap(this::checkOneRic) // - .onErrorResume(throwable -> Mono.empty()); - } + return Flux.fromIterable(rics.getRics()) // + .flatMap(this::createRicData) // + .flatMap(this::checkOneRic) // + .onErrorResume(throwable -> Mono.empty()); + } private Mono checkOneRic(RicData ricData) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index 3879fd68..00ca0edc 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -22,15 +22,10 @@ package org.oransc.policyagent.tasks; import static org.oransc.policyagent.repository.Ric.RicState; -import java.util.ArrayList; -import java.util.List; -import java.util.Vector; - import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.repository.ImmutablePolicyType; -import org.oransc.policyagent.repository.Lock; import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; @@ -45,6 +40,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; /** * Synchronizes the content of a RIC with the content in the repository. This @@ -76,36 +72,51 @@ public class RicSynchronizationTask { this.services = services; } - @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields public void run(Ric ric) { logger.debug("Handling ric: {}", ric.getConfig().name()); - synchronized (ric) { - if (ric.getState() == RicState.SYNCHRONIZING) { - logger.debug("Ric: {} is already being synchronized", ric.getConfig().name()); - return; - } - ric.setState(RicState.SYNCHRONIZING); + if (ric.getState() == RicState.SYNCHRONIZING) { + logger.debug("Ric: {} is already being synchronized", ric.getConfig().name()); + return; } - ric.getLock().lock(LockType.EXCLUSIVE) // Make sure no NBI updates are running - .flatMap(Lock::unlock) // + ric.getLock().lock(LockType.EXCLUSIVE) // + .flatMap(notUsed -> setRicState(ric)) // .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) // - .flatMapMany(client -> startSynchronization(ric, client)) // + .flatMapMany(client -> runSynchronization(ric, client)) // + .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) .subscribe(new BaseSubscriber() { @Override protected void hookOnError(Throwable throwable) { - startDeleteAllPolicyInstances(ric, throwable); + logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage()); + ric.setState(RicState.UNDEFINED); } @Override protected void hookOnComplete() { onSynchronizationComplete(ric); } + + @Override + protected void hookFinally(SignalType type) { + ric.getLock().unlockBlocking(); + } }); } - private Flux startSynchronization(Ric ric, A1Client a1Client) { + @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields + private Mono setRicState(Ric ric) { + synchronized (ric) { + if (ric.getState() == RicState.SYNCHRONIZING) { + logger.debug("Ric: {} is already being synchronized", ric.getConfig().name()); + return Mono.empty(); + } + ric.setState(RicState.SYNCHRONIZING); + return Mono.just(ric); + } + } + + private Flux runSynchronization(Ric ric, A1Client a1Client) { Flux synchronizedTypes = synchronizePolicyTypes(ric, a1Client); Flux policiesDeletedInRic = a1Client.deleteAllPolicies(); Flux policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client); @@ -114,30 +125,27 @@ public class RicSynchronizationTask { } private void onSynchronizationComplete(Ric ric) { - logger.info("Synchronization completed for: {}", ric.name()); + logger.debug("Synchronization completed for: {}", ric.name()); ric.setState(RicState.IDLE); notifyAllServices("Synchronization completed for:" + ric.name()); } private void notifyAllServices(String body) { - synchronized (services) { - for (Service service : services.getAll()) { - String url = service.getCallbackUrl(); - if (service.getCallbackUrl().length() > 0) { - createNotificationClient(url) // - .put("", body) // - .subscribe( // - notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger - .warn("Service notification failed for service: {}", service.getName(), throwable), - () -> logger.debug("All services notified")); - } + for (Service service : services.getAll()) { + String url = service.getCallbackUrl(); + if (service.getCallbackUrl().length() > 0) { + createNotificationClient(url) // + .put("", body) // + .subscribe( // + notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger + .warn("Service notification failed for service: {}", service.getName(), throwable), + () -> logger.debug("All services notified")); } } } - private void startDeleteAllPolicyInstances(Ric ric, Throwable t) { - logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage()); - // If synchronization fails, try to remove all instances + private Flux deleteAllPolicyInstances(Ric ric, Throwable t) { + logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage()); deleteAllPoliciesInRepository(ric); Flux synchronizedTypes = this.a1ClientFactory.createA1Client(ric) // @@ -146,15 +154,7 @@ public class RicSynchronizationTask { .flatMapMany(A1Client::deleteAllPolicies) // .doOnComplete(() -> deleteAllPoliciesInRepository(ric)); - Flux.concat(synchronizedTypes, deletePoliciesInRic) // - .subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), // - throwable -> onDeleteAllPolicyInstancesError(ric, throwable), // - () -> onSynchronizationComplete(ric)); - } - - private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) { - logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage()); - ric.setState(RicState.UNDEFINED); + return Flux.concat(synchronizedTypes, deletePoliciesInRic); } AsyncRestClient createNotificationClient(final String url) { @@ -185,11 +185,8 @@ public class RicSynchronizationTask { } private void deleteAllPoliciesInRepository(Ric ric) { - synchronized (policies) { - List ricPolicies = new ArrayList<>(policies.getForRic(ric.name())); - for (Policy policy : ricPolicies) { - this.policies.remove(policy); - } + for (Policy policy : policies.getForRic(ric.name())) { + this.policies.remove(policy); } } @@ -200,10 +197,8 @@ public class RicSynchronizationTask { } private Flux recreateAllPoliciesInRic(Ric ric, A1Client a1Client) { - synchronized (policies) { - return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) // - .flatMap(policy -> putPolicy(policy, ric, a1Client)); - } + return Flux.fromIterable(policies.getForRic(ric.name())) // + .flatMap(policy -> putPolicy(policy, ric, a1Client)); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java index 26509925..50e990cf 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java @@ -105,9 +105,7 @@ public class ServiceSupervision { } private Flux getAllPoliciesForService(Service service) { - synchronized (policies) { - return Flux.fromIterable(policies.getForService(service.getName())); - } + return Flux.fromIterable(policies.getForService(service.getName())); } private Mono deletePolicyInRic(Policy policy) { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 71bd9a5e..f701ee5b 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -38,7 +38,6 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -76,13 +75,9 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.web.server.LocalServerPort; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.web.client.RestTemplate; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; @@ -180,6 +175,7 @@ public class ApplicationTest { policies.clear(); policyTypes.clear(); services.clear(); + a1ClientFactory.reset(); } @AfterEach @@ -454,7 +450,6 @@ public class ApplicationTest { @Test public void testGetPolicies() throws Exception { - reset(); addPolicy("id1", "type1", "service1"); String url = "/policies"; @@ -642,51 +637,19 @@ public class ApplicationTest { return "{\n \"servingCellNrcgi\": \"1\"\n }"; } - private static class ConcurrencyTestRunnable implements Runnable { - private final RestTemplate restTemplate = new RestTemplate(); - private final String baseUrl; - static AtomicInteger nextCount = new AtomicInteger(0); - private final int count; - private final RicSupervision supervision; - - ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) { - this.baseUrl = baseUrl; - this.count = nextCount.incrementAndGet(); - this.supervision = supervision; - } - - @Override - public void run() { - for (int i = 0; i < 100; ++i) { - if (i % 10 == 0) { - this.supervision.checkAllRics(); - } - String name = "policy:" + count + ":" + i; - putPolicy(name); - deletePolicy(name); - } - } - - private void putPolicy(String name) { - String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1"; - restTemplate.put(putUrl, createJsonHttpEntity("{}")); - } - - private void deletePolicy(String name) { - String deleteUrl = baseUrl + "/policy?instance=" + name; - restTemplate.delete(deleteUrl); - } - } - @Test public void testConcurrency() throws Exception { final Instant startTime = Instant.now(); List threads = new ArrayList<>(); - addRic("ric1"); - addPolicyType("type1", "ric1"); + a1ClientFactory.setResponseDelay(Duration.ofMillis(1)); + addRic("ric"); + addPolicyType("type1", "ric"); + addPolicyType("type2", "ric"); for (int i = 0; i < 100; ++i) { - Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i); + Thread t = + new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes), + "TestThread_" + i); t.start(); threads.add(t); } @@ -758,12 +721,6 @@ public class ApplicationTest { return ric; } - private static HttpEntity createJsonHttpEntity(String content) { - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - return new HttpEntity(content, headers); - } - private static List parseList(String jsonString, Class clazz) { List result = new ArrayList<>(); JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray(); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java new file mode 100644 index 00000000..26e20912 --- /dev/null +++ b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java @@ -0,0 +1,133 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2020 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.oransc.policyagent.repository.ImmutablePolicy; +import org.oransc.policyagent.repository.Policy; +import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.repository.PolicyTypes; +import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.tasks.RicSupervision; +import org.oransc.policyagent.utils.MockA1Client; +import org.oransc.policyagent.utils.MockA1ClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.web.client.RestTemplate; + +/** + * Invoke operations over the NBI and start synchronizations in a separate + * thread. For test of robustness using concurrent clients. + */ +class ConcurrencyTestRunnable implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(ConcurrencyTestRunnable.class); + private final RestTemplate restTemplate = new RestTemplate(); + private final String baseUrl; + static AtomicInteger nextCount = new AtomicInteger(0); + private final int count; + private final RicSupervision supervision; + private final MockA1ClientFactory a1ClientFactory; + private final Rics rics; + private final PolicyTypes types; + + ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision, MockA1ClientFactory a1ClientFactory, Rics rics, + PolicyTypes types) { + this.baseUrl = baseUrl; + this.count = nextCount.incrementAndGet(); + this.supervision = supervision; + this.a1ClientFactory = a1ClientFactory; + this.rics = rics; + this.types = types; + } + + @Override + public void run() { + try { + for (int i = 0; i < 100; ++i) { + if (i % 10 == 0) { + createInconsistency(); + this.supervision.checkAllRics(); + } + String name = "policy:" + count + ":" + i; + putPolicy(name); + putPolicy(name + "-"); + listPolicies(); + listTypes(); + deletePolicy(name); + deletePolicy(name + "-"); + } + } catch (Exception e) { + logger.error("Concurrency exception " + e.toString()); + } + } + + private Policy createPolicyObject(String id) { + Ric ric = this.rics.get("ric"); + PolicyType type = this.types.get("type1"); + return ImmutablePolicy.builder() // + .id(id) // + .json("{}") // + .type(type) // + .ric(ric) // + .ownerServiceName("") // + .lastModified("") // + .build(); + } + + private void createInconsistency() { + MockA1Client client = a1ClientFactory.getOrCreateA1Client("ric"); + Policy policy = createPolicyObject("junk"); + client.putPolicy(policy).block(); + + } + + private void listPolicies() { + String uri = baseUrl + "/policies"; + restTemplate.getForObject(uri, String.class); + } + + private void listTypes() { + String uri = baseUrl + "/policy_types"; + restTemplate.getForObject(uri, String.class); + } + + private void putPolicy(String name) { + String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric&service=service1"; + restTemplate.put(putUrl, createJsonHttpEntity("{}")); + } + + private void deletePolicy(String name) { + String deleteUrl = baseUrl + "/policy?instance=" + name; + restTemplate.delete(deleteUrl); + } + + private static HttpEntity createJsonHttpEntity(String content) { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + return new HttpEntity(content, headers); + } + +} diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index 481a1fe1..2b66a353 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -131,11 +131,9 @@ public class MockPolicyAgent { private void keepServerAlive() throws InterruptedException { logger.info("Keeping server alive!"); - synchronized (this) { this.wait(); } - } private static String title(String jsonSchema) { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java index eae00ea1..e0ce2efd 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java @@ -264,9 +264,7 @@ public class RicSynchronizationTaskTest { synchronizerUnderTest.run(RIC_1); verifyCorrectLogMessage(0, logAppender, - "Synchronization failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage); - verifyCorrectLogMessage(1, logAppender, - "Synchronization failure recovery failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage); + "Recreation of policies failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage); verify(a1ClientMock, times(2)).deleteAllPolicies(); verifyNoMoreInteractions(a1ClientMock); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java index 3265d763..0fa5be4f 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java @@ -46,25 +46,21 @@ public class MockA1Client implements A1Client { @Override public Mono> getPolicyTypeIdentities() { - synchronized (this.policyTypes) { - List result = new Vector<>(); - for (PolicyType p : this.policyTypes.getAll()) { - result.add(p.name()); - } - return mono(result); + List result = new Vector<>(); + for (PolicyType p : this.policyTypes.getAll()) { + result.add(p.name()); } + return mono(result); } @Override public Mono> getPolicyIdentities() { - synchronized (this.policies) { - Vector result = new Vector<>(); - for (Policy policy : policies.getAll()) { - result.add(policy.id()); - } - - return mono(result); + Vector result = new Vector<>(); + for (Policy policy : policies.getAll()) { + result.add(policy.id()); } + + return mono(result); } @Override diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java index c1fd8c31..99679583 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java @@ -72,4 +72,9 @@ public class MockA1ClientFactory extends A1ClientFactory { this.asynchDelay = delay; } + public void reset() { + this.asynchDelay = Duration.ofSeconds(0); + clients.clear(); + } + }