Bugfix, only one RIC was synched
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
index 68ea5a7..ed94492 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent.repository;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -31,31 +32,72 @@ import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoSink;
 
 /**
- * A resource lock. The caller thread will be blocked until the lock is granted.
- * Exclusive means that the caller takes exclusive ownership of the resurce. Non
- * exclusive lock means that several users can lock the resource (for shared
- * usage).
+ * A resource lock. Exclusive means that the caller takes exclusive ownership of
+ * the resurce. Non exclusive lock means that several users can lock the
+ * resource (for shared usage).
  */
 public class Lock {
     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
 
     private boolean isExclusive = false;
-    private int cnt = 0;
+    private int lockCounter = 0;
+    private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+    private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
 
-    public static enum LockType {
+    private static class AsynchCallbackExecutor implements Runnable {
+        private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+        public AsynchCallbackExecutor() {
+            Thread thread = new Thread(this);
+            thread.start();
+        }
+
+        public synchronized void addAll(List<LockRequest> requests) {
+            this.lockRequestQueue.addAll(requests);
+            this.notifyAll();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    for (LockRequest request : consume()) {
+                        request.callback.success(request.lock);
+                    }
+                    waitForNewEntries();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                logger.error("Interrupted {}", e.getMessage());
+            }
+        }
+
+        private synchronized List<LockRequest> consume() {
+            List<LockRequest> q = this.lockRequestQueue;
+            this.lockRequestQueue = new LinkedList<>();
+            return q;
+        }
+
+        @SuppressWarnings("java:S2274")
+        private synchronized void waitForNewEntries() throws InterruptedException {
+            if (this.lockRequestQueue.isEmpty()) {
+                this.wait();
+            }
+        }
+    }
+
+    public enum LockType {
         EXCLUSIVE, SHARED
     }
 
+    /** The caller thread will be blocked util the lock is granted. */
     public synchronized void lockBlocking(LockType locktype) {
         while (!tryLock(locktype)) {
             this.waitForUnlock();
         }
     }
 
-    public synchronized void lockBlocking() {
-        lockBlocking(LockType.SHARED);
-    }
-
+    /** Reactive version. The Lock will be emitted when the lock is granted */
     public synchronized Mono<Lock> lock(LockType lockType) {
         if (tryLock(lockType)) {
             return Mono.just(this);
@@ -64,77 +106,89 @@ public class Lock {
         }
     }
 
-    public synchronized void unlock() {
-        if (disable()) {
-            return;
-        }
-        if (cnt <= 0) {
-            cnt = -1; // Might as well stop, to make it easier to find the problem
-            throw new RuntimeException("Number of unlocks must match the number of locks");
-        }
-        this.cnt--;
-        if (cnt == 0) {
-            isExclusive = false;
+    public Mono<Lock> unlock() {
+        return Mono.create(monoSink -> {
+            unlockBlocking();
+            monoSink.success(this);
+        });
+    }
+
+    public void unlockBlocking() {
+        synchronized (this) {
+            if (lockCounter <= 0) {
+                lockCounter = -1; // Might as well stop, to make it easier to find the problem
+                logger.error("Number of unlocks must match the number of locks");
+            }
+            this.lockCounter--;
+            if (lockCounter == 0) {
+                isExclusive = false;
+            }
+            this.notifyAll();
         }
         this.processQueuedEntries();
-        this.notifyAll();
+    }
+
+    @Override
+    public String toString() {
+        return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+            + this.lockRequestQueue.size();
+    }
+
+    /** returns the current number of granted locks */
+    public synchronized int getLockCounter() {
+        return this.lockCounter;
     }
 
     private void processQueuedEntries() {
-        for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
-            QueueEntry e = i.next();
-            if (tryLock(e.lockType)) {
-                i.remove();
-                e.callback.success(this);
+        List<LockRequest> granted = new ArrayList<>();
+        synchronized (this) {
+            for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
+                LockRequest request = i.next();
+                if (tryLock(request.lockType)) {
+                    i.remove();
+                    granted.add(request);
+                }
             }
         }
+        callbackProcessor.addAll(granted);
     }
 
-    static class QueueEntry {
+    private static class LockRequest {
         final MonoSink<Lock> callback;
         final LockType lockType;
+        final Lock lock;
 
-        QueueEntry(MonoSink<Lock> callback, LockType lockType) {
+        LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
             this.callback = callback;
             this.lockType = lockType;
+            this.lock = lock;
         }
     }
 
-    private final List<QueueEntry> queue = new LinkedList<>();
-
     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
-        queue.add(new QueueEntry(callback, lockType));
+        lockRequestQueue.add(new LockRequest(callback, lockType, this));
     }
 
-    private void waitForUnlock() {
+    @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
+    private synchronized void waitForUnlock() {
         try {
             this.wait();
         } catch (InterruptedException e) {
             logger.warn("waitForUnlock interrupted", e);
+            Thread.currentThread().interrupt();
         }
     }
 
-    private boolean disable() {
-        return true;
-    }
-
     private boolean tryLock(LockType lockType) {
-        if (disable()) {
-            return true;
-        }
         if (this.isExclusive) {
             return false;
         }
-        if (lockType == LockType.EXCLUSIVE && cnt > 0) {
+        if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
             return false;
         }
-        cnt++;
+        lockCounter++;
         this.isExclusive = lockType == LockType.EXCLUSIVE;
         return true;
     }
 
-    public synchronized int getLockCounter() {
-        return this.cnt;
-    }
-
 }