Some minor refacturings of the Lock 66/3966/1
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 3 Jun 2020 09:59:00 +0000 (11:59 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 3 Jun 2020 11:22:07 +0000 (13:22 +0200)
Private classes removed to the bottom.

Some synchronizations could be simplified.

Issue-ID: NONRTRIC-195
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I46f3bddf4828cf48d4199cf984a3e14295a42340

policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java

index a19870b..716148f 100644 (file)
@@ -44,48 +44,6 @@ public class Lock {
     private final List<LockRequest> lockRequestQueue = new LinkedList<>();
     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
 
-    private static class AsynchCallbackExecutor implements Runnable {
-        private List<LockRequest> lockRequestQueue = new LinkedList<>();
-
-        public AsynchCallbackExecutor() {
-            Thread thread = new Thread(this);
-            thread.start();
-        }
-
-        public synchronized void addAll(List<LockRequest> requests) {
-            this.lockRequestQueue.addAll(requests);
-            this.notifyAll();
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    for (LockRequest request : consume()) {
-                        request.callback.success(request.lock);
-                    }
-                    waitForNewEntries();
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                logger.error("Interrupted {}", e.getMessage());
-            }
-        }
-
-        private synchronized List<LockRequest> consume() {
-            List<LockRequest> q = this.lockRequestQueue;
-            this.lockRequestQueue = new LinkedList<>();
-            return q;
-        }
-
-        @SuppressWarnings("java:S2274")
-        private synchronized void waitForNewEntries() throws InterruptedException {
-            if (this.lockRequestQueue.isEmpty()) {
-                this.wait();
-            }
-        }
-    }
-
     public enum LockType {
         EXCLUSIVE, SHARED
     }
@@ -113,23 +71,21 @@ public class Lock {
         });
     }
 
-    public void unlockBlocking() {
-        synchronized (this) {
-            if (lockCounter <= 0) {
-                lockCounter = -1; // Might as well stop, to make it easier to find the problem
-                logger.error("Number of unlocks must match the number of locks");
-            }
-            this.lockCounter--;
-            if (lockCounter == 0) {
-                isExclusive = false;
-            }
-            this.notifyAll();
+    public synchronized void unlockBlocking() {
+        if (lockCounter <= 0) {
+            lockCounter = -1; // Might as well stop, to make it easier to find the problem
+            logger.error("Number of unlocks must match the number of locks");
+        }
+        this.lockCounter--;
+        if (lockCounter == 0) {
+            isExclusive = false;
         }
+        this.notifyAll();
         this.processQueuedEntries();
     }
 
     @Override
-    public String toString() {
+    public synchronized String toString() {
         return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
             + this.lockRequestQueue.size();
     }
@@ -141,30 +97,16 @@ public class Lock {
 
     private void processQueuedEntries() {
         List<LockRequest> granted = new ArrayList<>();
-        synchronized (this) {
-            for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
-                LockRequest request = i.next();
-                if (tryLock(request.lockType)) {
-                    i.remove();
-                    granted.add(request);
-                }
+        for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
+            LockRequest request = i.next();
+            if (tryLock(request.lockType)) {
+                i.remove();
+                granted.add(request);
             }
         }
         callbackProcessor.addAll(granted);
     }
 
-    private static class LockRequest {
-        final MonoSink<Lock> callback;
-        final LockType lockType;
-        final Lock lock;
-
-        LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
-            this.callback = callback;
-            this.lockType = lockType;
-            this.lock = lock;
-        }
-    }
-
     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
         lockRequestQueue.add(new LockRequest(callback, lockType, this));
         processQueuedEntries();
@@ -192,4 +134,64 @@ public class Lock {
         return true;
     }
 
+    /**
+     * Represents a queued lock request
+     */
+    private static class LockRequest {
+        final MonoSink<Lock> callback;
+        final LockType lockType;
+        final Lock lock;
+
+        LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
+            this.callback = callback;
+            this.lockType = lockType;
+            this.lock = lock;
+        }
+    }
+
+    /**
+     * A separate thread that calls a MonoSink to continue. This is done after a
+     * queued lock is granted.
+     */
+    private static class AsynchCallbackExecutor implements Runnable {
+        private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+        public AsynchCallbackExecutor() {
+            Thread thread = new Thread(this);
+            thread.start();
+        }
+
+        public synchronized void addAll(List<LockRequest> requests) {
+            this.lockRequestQueue.addAll(requests);
+            this.notifyAll();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    for (LockRequest request : consume()) {
+                        request.callback.success(request.lock);
+                    }
+                    waitForNewEntries();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                logger.error("Interrupted {}", e.getMessage());
+            }
+        }
+
+        private synchronized List<LockRequest> consume() {
+            List<LockRequest> q = this.lockRequestQueue;
+            this.lockRequestQueue = new LinkedList<>();
+            return q;
+        }
+
+        @SuppressWarnings("java:S2274")
+        private synchronized void waitForNewEntries() throws InterruptedException {
+            if (this.lockRequestQueue.isEmpty()) {
+                this.wait();
+            }
+        }
+    }
 }