X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Frepository%2FLock.java;h=716148fde8d14a5a4fb720772ac573a776cb3d36;hb=6a39814272307d0207222c9229b0d765ac062bf0;hp=a19870be75fa16be3060eeae5d9745cf8b4c3526;hpb=bd30ef4a1c87d27491bcbf8b60bd6627bec85a48;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java index a19870be..716148fd 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java @@ -44,48 +44,6 @@ public class Lock { private final List lockRequestQueue = new LinkedList<>(); private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); - private static class AsynchCallbackExecutor implements Runnable { - private List lockRequestQueue = new LinkedList<>(); - - public AsynchCallbackExecutor() { - Thread thread = new Thread(this); - thread.start(); - } - - public synchronized void addAll(List requests) { - this.lockRequestQueue.addAll(requests); - this.notifyAll(); - } - - @Override - public void run() { - try { - while (true) { - for (LockRequest request : consume()) { - request.callback.success(request.lock); - } - waitForNewEntries(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Interrupted {}", e.getMessage()); - } - } - - private synchronized List consume() { - List q = this.lockRequestQueue; - this.lockRequestQueue = new LinkedList<>(); - return q; - } - - @SuppressWarnings("java:S2274") - private synchronized void waitForNewEntries() throws InterruptedException { - if (this.lockRequestQueue.isEmpty()) { - this.wait(); - } - } - } - public enum LockType { EXCLUSIVE, SHARED } @@ -113,23 +71,21 @@ public class Lock { }); } - public void unlockBlocking() { - synchronized (this) { - if (lockCounter <= 0) { - lockCounter = -1; // Might as well stop, to make it easier to find the problem - logger.error("Number of unlocks must match the number of locks"); - } - this.lockCounter--; - if (lockCounter == 0) { - isExclusive = false; - } - this.notifyAll(); + public synchronized void unlockBlocking() { + if (lockCounter <= 0) { + lockCounter = -1; // Might as well stop, to make it easier to find the problem + logger.error("Number of unlocks must match the number of locks"); + } + this.lockCounter--; + if (lockCounter == 0) { + isExclusive = false; } + this.notifyAll(); this.processQueuedEntries(); } @Override - public String toString() { + public synchronized String toString() { return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: " + this.lockRequestQueue.size(); } @@ -141,30 +97,16 @@ public class Lock { private void processQueuedEntries() { List granted = new ArrayList<>(); - synchronized (this) { - for (Iterator i = lockRequestQueue.iterator(); i.hasNext();) { - LockRequest request = i.next(); - if (tryLock(request.lockType)) { - i.remove(); - granted.add(request); - } + for (Iterator i = lockRequestQueue.iterator(); i.hasNext();) { + LockRequest request = i.next(); + if (tryLock(request.lockType)) { + i.remove(); + granted.add(request); } } callbackProcessor.addAll(granted); } - private static class LockRequest { - final MonoSink callback; - final LockType lockType; - final Lock lock; - - LockRequest(MonoSink callback, LockType lockType, Lock lock) { - this.callback = callback; - this.lockType = lockType; - this.lock = lock; - } - } - private synchronized void addToQueue(MonoSink callback, LockType lockType) { lockRequestQueue.add(new LockRequest(callback, lockType, this)); processQueuedEntries(); @@ -192,4 +134,64 @@ public class Lock { return true; } + /** + * Represents a queued lock request + */ + private static class LockRequest { + final MonoSink callback; + final LockType lockType; + final Lock lock; + + LockRequest(MonoSink callback, LockType lockType, Lock lock) { + this.callback = callback; + this.lockType = lockType; + this.lock = lock; + } + } + + /** + * A separate thread that calls a MonoSink to continue. This is done after a + * queued lock is granted. + */ + private static class AsynchCallbackExecutor implements Runnable { + private List lockRequestQueue = new LinkedList<>(); + + public AsynchCallbackExecutor() { + Thread thread = new Thread(this); + thread.start(); + } + + public synchronized void addAll(List requests) { + this.lockRequestQueue.addAll(requests); + this.notifyAll(); + } + + @Override + public void run() { + try { + while (true) { + for (LockRequest request : consume()) { + request.callback.success(request.lock); + } + waitForNewEntries(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted {}", e.getMessage()); + } + } + + private synchronized List consume() { + List q = this.lockRequestQueue; + this.lockRequestQueue = new LinkedList<>(); + return q; + } + + @SuppressWarnings("java:S2274") + private synchronized void waitForNewEntries() throws InterruptedException { + if (this.lockRequestQueue.isEmpty()) { + this.wait(); + } + } + } }