private final List<LockRequest> lockRequestQueue = new LinkedList<>();
private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
- private static class AsynchCallbackExecutor implements Runnable {
- private List<LockRequest> lockRequestQueue = new LinkedList<>();
-
- public AsynchCallbackExecutor() {
- Thread thread = new Thread(this);
- thread.start();
- }
-
- public synchronized void addAll(List<LockRequest> requests) {
- this.lockRequestQueue.addAll(requests);
- this.notifyAll();
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- for (LockRequest request : consume()) {
- request.callback.success(request.lock);
- }
- waitForNewEntries();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Interrupted {}", e.getMessage());
- }
- }
-
- private synchronized List<LockRequest> consume() {
- List<LockRequest> q = this.lockRequestQueue;
- this.lockRequestQueue = new LinkedList<>();
- return q;
- }
-
- @SuppressWarnings("java:S2274")
- private synchronized void waitForNewEntries() throws InterruptedException {
- if (this.lockRequestQueue.isEmpty()) {
- this.wait();
- }
- }
- }
-
public enum LockType {
EXCLUSIVE, SHARED
}
});
}
- public void unlockBlocking() {
- synchronized (this) {
- if (lockCounter <= 0) {
- lockCounter = -1; // Might as well stop, to make it easier to find the problem
- logger.error("Number of unlocks must match the number of locks");
- }
- this.lockCounter--;
- if (lockCounter == 0) {
- isExclusive = false;
- }
- this.notifyAll();
+ public synchronized void unlockBlocking() {
+ if (lockCounter <= 0) {
+ lockCounter = -1; // Might as well stop, to make it easier to find the problem
+ logger.error("Number of unlocks must match the number of locks");
+ }
+ this.lockCounter--;
+ if (lockCounter == 0) {
+ isExclusive = false;
}
+ this.notifyAll();
this.processQueuedEntries();
}
@Override
- public String toString() {
+ public synchronized String toString() {
return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+ this.lockRequestQueue.size();
}
private void processQueuedEntries() {
List<LockRequest> granted = new ArrayList<>();
- synchronized (this) {
- for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
- LockRequest request = i.next();
- if (tryLock(request.lockType)) {
- i.remove();
- granted.add(request);
- }
+ for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
+ LockRequest request = i.next();
+ if (tryLock(request.lockType)) {
+ i.remove();
+ granted.add(request);
}
}
callbackProcessor.addAll(granted);
}
- private static class LockRequest {
- final MonoSink<Lock> callback;
- final LockType lockType;
- final Lock lock;
-
- LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
- this.callback = callback;
- this.lockType = lockType;
- this.lock = lock;
- }
- }
-
private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
lockRequestQueue.add(new LockRequest(callback, lockType, this));
processQueuedEntries();
return true;
}
+ /**
+ * Represents a queued lock request
+ */
+ private static class LockRequest {
+ final MonoSink<Lock> callback;
+ final LockType lockType;
+ final Lock lock;
+
+ LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
+ this.callback = callback;
+ this.lockType = lockType;
+ this.lock = lock;
+ }
+ }
+
+ /**
+ * A separate thread that calls a MonoSink to continue. This is done after a
+ * queued lock is granted.
+ */
+ private static class AsynchCallbackExecutor implements Runnable {
+ private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+ public AsynchCallbackExecutor() {
+ Thread thread = new Thread(this);
+ thread.start();
+ }
+
+ public synchronized void addAll(List<LockRequest> requests) {
+ this.lockRequestQueue.addAll(requests);
+ this.notifyAll();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ for (LockRequest request : consume()) {
+ request.callback.success(request.lock);
+ }
+ waitForNewEntries();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted {}", e.getMessage());
+ }
+ }
+
+ private synchronized List<LockRequest> consume() {
+ List<LockRequest> q = this.lockRequestQueue;
+ this.lockRequestQueue = new LinkedList<>();
+ return q;
+ }
+
+ @SuppressWarnings("java:S2274")
+ private synchronized void waitForNewEntries() throws InterruptedException {
+ if (this.lockRequestQueue.isEmpty()) {
+ this.wait();
+ }
+ }
+ }
}