package org.oransc.policyagent.repository;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import reactor.core.publisher.MonoSink;
/**
- * A resource lock. The caller thread will be blocked until the lock is granted.
- * Exclusive means that the caller takes exclusive ownership of the resurce. Non
- * exclusive lock means that several users can lock the resource (for shared
- * usage).
+ * A resource lock. Exclusive means that the caller takes exclusive ownership of
+ * the resurce. Non exclusive lock means that several users can lock the
+ * resource (for shared usage).
*/
public class Lock {
private static final Logger logger = LoggerFactory.getLogger(Lock.class);
private boolean isExclusive = false;
- private int cnt = 0;
+ private int lockCounter = 0;
+ private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+ private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
- public static enum LockType {
+ private static class AsynchCallbackExecutor implements Runnable {
+ private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+ public AsynchCallbackExecutor() {
+ Thread thread = new Thread(this);
+ thread.start();
+ }
+
+ public synchronized void addAll(List<LockRequest> requests) {
+ this.lockRequestQueue.addAll(requests);
+ this.notifyAll();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ for (LockRequest request : consume()) {
+ request.callback.success(request.lock);
+ }
+ waitForNewEntries();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted {}", e.getMessage());
+ }
+ }
+
+ private synchronized List<LockRequest> consume() {
+ List<LockRequest> q = this.lockRequestQueue;
+ this.lockRequestQueue = new LinkedList<>();
+ return q;
+ }
+
+ @SuppressWarnings("java:S2274")
+ private synchronized void waitForNewEntries() throws InterruptedException {
+ if (this.lockRequestQueue.isEmpty()) {
+ this.wait();
+ }
+ }
+ }
+
+ public enum LockType {
EXCLUSIVE, SHARED
}
+ /** The caller thread will be blocked util the lock is granted. */
public synchronized void lockBlocking(LockType locktype) {
while (!tryLock(locktype)) {
this.waitForUnlock();
}
}
- public synchronized void lockBlocking() {
- lockBlocking(LockType.SHARED);
- }
-
+ /** Reactive version. The Lock will be emitted when the lock is granted */
public synchronized Mono<Lock> lock(LockType lockType) {
if (tryLock(lockType)) {
return Mono.just(this);
}
}
- public synchronized void unlock() {
- if (disable()) {
- return;
- }
- if (cnt <= 0) {
- cnt = -1; // Might as well stop, to make it easier to find the problem
- throw new RuntimeException("Number of unlocks must match the number of locks");
- }
- this.cnt--;
- if (cnt == 0) {
- isExclusive = false;
+ public Mono<Lock> unlock() {
+ return Mono.create(monoSink -> {
+ unlockBlocking();
+ monoSink.success(this);
+ });
+ }
+
+ public void unlockBlocking() {
+ synchronized (this) {
+ if (lockCounter <= 0) {
+ lockCounter = -1; // Might as well stop, to make it easier to find the problem
+ throw new NullPointerException("Number of unlocks must match the number of locks");
+ }
+ this.lockCounter--;
+ if (lockCounter == 0) {
+ isExclusive = false;
+ }
+ this.notifyAll();
}
this.processQueuedEntries();
- this.notifyAll();
+ }
+
+ @Override
+ public String toString() {
+ return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
+ }
+
+ /** returns the current number of granted locks */
+ public synchronized int getLockCounter() {
+ return this.lockCounter;
}
private void processQueuedEntries() {
- for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
- QueueEntry e = i.next();
- if (tryLock(e.lockType)) {
- i.remove();
- e.callback.success(this);
+ List<LockRequest> granted = new ArrayList<>();
+ synchronized (this) {
+ for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
+ LockRequest request = i.next();
+ if (tryLock(request.lockType)) {
+ i.remove();
+ granted.add(request);
+ }
}
}
+ callbackProcessor.addAll(granted);
}
- static class QueueEntry {
+ private static class LockRequest {
final MonoSink<Lock> callback;
final LockType lockType;
+ final Lock lock;
- QueueEntry(MonoSink<Lock> callback, LockType lockType) {
+ LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
this.callback = callback;
this.lockType = lockType;
+ this.lock = lock;
}
}
- private final List<QueueEntry> queue = new LinkedList<>();
-
private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
- queue.add(new QueueEntry(callback, lockType));
+ lockRequestQueue.add(new LockRequest(callback, lockType, this));
}
- private void waitForUnlock() {
+ @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
+ private synchronized void waitForUnlock() {
try {
this.wait();
} catch (InterruptedException e) {
logger.warn("waitForUnlock interrupted", e);
+ Thread.currentThread().interrupt();
}
}
- private boolean disable() {
- return true;
- }
-
private boolean tryLock(LockType lockType) {
- if (disable()) {
- return true;
- }
if (this.isExclusive) {
return false;
}
- if (lockType == LockType.EXCLUSIVE && cnt > 0) {
+ if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
return false;
}
- cnt++;
+ lockCounter++;
this.isExclusive = lockType == LockType.EXCLUSIVE;
return true;
}
- public synchronized int getLockCounter() {
- return this.cnt;
- }
-
}