+ /**
+ * Represents a queued lock request
+ */
+ private static class LockRequest {
+ final MonoSink<Lock> callback;
+ final LockType lockType;
+ final Lock lock;
+
+ LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
+ this.callback = callback;
+ this.lockType = lockType;
+ this.lock = lock;
+ }
+ }
+
+ /**
+ * A separate thread that calls a MonoSink to continue. This is done after a
+ * queued lock is granted.
+ */
+ private static class AsynchCallbackExecutor implements Runnable {
+ private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+ public AsynchCallbackExecutor() {
+ Thread thread = new Thread(this);
+ thread.start();
+ }
+
+ public synchronized void addAll(List<LockRequest> requests) {
+ this.lockRequestQueue.addAll(requests);
+ this.notifyAll();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ for (LockRequest request : consume()) {
+ request.callback.success(request.lock);
+ }
+ waitForNewEntries();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted {}", e.getMessage());
+ }
+ }
+
+ private synchronized List<LockRequest> consume() {
+ List<LockRequest> q = this.lockRequestQueue;
+ this.lockRequestQueue = new LinkedList<>();
+ return q;
+ }
+
+ @SuppressWarnings("java:S2274")
+ private synchronized void waitForNewEntries() throws InterruptedException {
+ if (this.lockRequestQueue.isEmpty()) {
+ this.wait();
+ }
+ }
+ }