X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Frepository%2FLock.java;h=ed944924ca037963eea8127962584e4c3150cac0;hb=7de2f355ed1956001d15cb7e57fdd37fdf88cdc5;hp=68ea5a7bdb45a14065ac882bab10a25c9e86d47d;hpb=7c297ddb425a52dae965adc6a83629a14421ea05;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java index 68ea5a7b..ed944924 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java @@ -20,6 +20,7 @@ package org.oransc.policyagent.repository; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -31,31 +32,72 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; /** - * A resource lock. The caller thread will be blocked until the lock is granted. - * Exclusive means that the caller takes exclusive ownership of the resurce. Non - * exclusive lock means that several users can lock the resource (for shared - * usage). + * A resource lock. Exclusive means that the caller takes exclusive ownership of + * the resurce. Non exclusive lock means that several users can lock the + * resource (for shared usage). */ public class Lock { private static final Logger logger = LoggerFactory.getLogger(Lock.class); private boolean isExclusive = false; - private int cnt = 0; + private int lockCounter = 0; + private final List lockRequestQueue = new LinkedList<>(); + private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); - public static enum LockType { + private static class AsynchCallbackExecutor implements Runnable { + private List lockRequestQueue = new LinkedList<>(); + + public AsynchCallbackExecutor() { + Thread thread = new Thread(this); + thread.start(); + } + + public synchronized void addAll(List requests) { + this.lockRequestQueue.addAll(requests); + this.notifyAll(); + } + + @Override + public void run() { + try { + while (true) { + for (LockRequest request : consume()) { + request.callback.success(request.lock); + } + waitForNewEntries(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted {}", e.getMessage()); + } + } + + private synchronized List consume() { + List q = this.lockRequestQueue; + this.lockRequestQueue = new LinkedList<>(); + return q; + } + + @SuppressWarnings("java:S2274") + private synchronized void waitForNewEntries() throws InterruptedException { + if (this.lockRequestQueue.isEmpty()) { + this.wait(); + } + } + } + + public enum LockType { EXCLUSIVE, SHARED } + /** The caller thread will be blocked util the lock is granted. */ public synchronized void lockBlocking(LockType locktype) { while (!tryLock(locktype)) { this.waitForUnlock(); } } - public synchronized void lockBlocking() { - lockBlocking(LockType.SHARED); - } - + /** Reactive version. The Lock will be emitted when the lock is granted */ public synchronized Mono lock(LockType lockType) { if (tryLock(lockType)) { return Mono.just(this); @@ -64,77 +106,89 @@ public class Lock { } } - public synchronized void unlock() { - if (disable()) { - return; - } - if (cnt <= 0) { - cnt = -1; // Might as well stop, to make it easier to find the problem - throw new RuntimeException("Number of unlocks must match the number of locks"); - } - this.cnt--; - if (cnt == 0) { - isExclusive = false; + public Mono unlock() { + return Mono.create(monoSink -> { + unlockBlocking(); + monoSink.success(this); + }); + } + + public void unlockBlocking() { + synchronized (this) { + if (lockCounter <= 0) { + lockCounter = -1; // Might as well stop, to make it easier to find the problem + logger.error("Number of unlocks must match the number of locks"); + } + this.lockCounter--; + if (lockCounter == 0) { + isExclusive = false; + } + this.notifyAll(); } this.processQueuedEntries(); - this.notifyAll(); + } + + @Override + public String toString() { + return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: " + + this.lockRequestQueue.size(); + } + + /** returns the current number of granted locks */ + public synchronized int getLockCounter() { + return this.lockCounter; } private void processQueuedEntries() { - for (Iterator i = queue.iterator(); i.hasNext();) { - QueueEntry e = i.next(); - if (tryLock(e.lockType)) { - i.remove(); - e.callback.success(this); + List granted = new ArrayList<>(); + synchronized (this) { + for (Iterator i = lockRequestQueue.iterator(); i.hasNext();) { + LockRequest request = i.next(); + if (tryLock(request.lockType)) { + i.remove(); + granted.add(request); + } } } + callbackProcessor.addAll(granted); } - static class QueueEntry { + private static class LockRequest { final MonoSink callback; final LockType lockType; + final Lock lock; - QueueEntry(MonoSink callback, LockType lockType) { + LockRequest(MonoSink callback, LockType lockType, Lock lock) { this.callback = callback; this.lockType = lockType; + this.lock = lock; } } - private final List queue = new LinkedList<>(); - private synchronized void addToQueue(MonoSink callback, LockType lockType) { - queue.add(new QueueEntry(callback, lockType)); + lockRequestQueue.add(new LockRequest(callback, lockType, this)); } - private void waitForUnlock() { + @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop + private synchronized void waitForUnlock() { try { this.wait(); } catch (InterruptedException e) { logger.warn("waitForUnlock interrupted", e); + Thread.currentThread().interrupt(); } } - private boolean disable() { - return true; - } - private boolean tryLock(LockType lockType) { - if (disable()) { - return true; - } if (this.isExclusive) { return false; } - if (lockType == LockType.EXCLUSIVE && cnt > 0) { + if (lockType == LockType.EXCLUSIVE && lockCounter > 0) { return false; } - cnt++; + lockCounter++; this.isExclusive = lockType == LockType.EXCLUSIVE; return true; } - public synchronized int getLockCounter() { - return this.cnt; - } - }