X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Frepository%2FLock.java;fp=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Frepository%2FLock.java;h=0000000000000000000000000000000000000000;hb=13c62d122c38b98cbdc76a4a775c6f6187e40e39;hp=716148fde8d14a5a4fb720772ac573a776cb3d36;hpb=6a39814272307d0207222c9229b0d765ac062bf0;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java deleted file mode 100644 index 716148fd..00000000 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java +++ /dev/null @@ -1,197 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * O-RAN-SC - * %% - * Copyright (C) 2019 Nordix Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.oransc.policyagent.repository; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; - -/** - * A resource lock. Exclusive means that the caller takes exclusive ownership of - * the resurce. Non exclusive lock means that several users can lock the - * resource (for shared usage). - */ -public class Lock { - private static final Logger logger = LoggerFactory.getLogger(Lock.class); - - private boolean isExclusive = false; - private int lockCounter = 0; - private final List lockRequestQueue = new LinkedList<>(); - private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); - - public enum LockType { - EXCLUSIVE, SHARED - } - - /** The caller thread will be blocked util the lock is granted. */ - public synchronized void lockBlocking(LockType locktype) { - while (!tryLock(locktype)) { - this.waitForUnlock(); - } - } - - /** Reactive version. The Lock will be emitted when the lock is granted */ - public synchronized Mono lock(LockType lockType) { - if (tryLock(lockType)) { - return Mono.just(this); - } else { - return Mono.create(monoSink -> addToQueue(monoSink, lockType)); - } - } - - public Mono unlock() { - return Mono.create(monoSink -> { - unlockBlocking(); - monoSink.success(this); - }); - } - - public synchronized void unlockBlocking() { - if (lockCounter <= 0) { - lockCounter = -1; // Might as well stop, to make it easier to find the problem - logger.error("Number of unlocks must match the number of locks"); - } - this.lockCounter--; - if (lockCounter == 0) { - isExclusive = false; - } - this.notifyAll(); - this.processQueuedEntries(); - } - - @Override - public synchronized String toString() { - return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: " - + this.lockRequestQueue.size(); - } - - /** returns the current number of granted locks */ - public synchronized int getLockCounter() { - return this.lockCounter; - } - - private void processQueuedEntries() { - List granted = new ArrayList<>(); - for (Iterator i = lockRequestQueue.iterator(); i.hasNext();) { - LockRequest request = i.next(); - if (tryLock(request.lockType)) { - i.remove(); - granted.add(request); - } - } - callbackProcessor.addAll(granted); - } - - private synchronized void addToQueue(MonoSink callback, LockType lockType) { - lockRequestQueue.add(new LockRequest(callback, lockType, this)); - processQueuedEntries(); - } - - @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop - private synchronized void waitForUnlock() { - try { - this.wait(); - } catch (InterruptedException e) { - logger.warn("waitForUnlock interrupted", e); - Thread.currentThread().interrupt(); - } - } - - private boolean tryLock(LockType lockType) { - if (this.isExclusive) { - return false; - } - if (lockType == LockType.EXCLUSIVE && lockCounter > 0) { - return false; - } - lockCounter++; - this.isExclusive = lockType == LockType.EXCLUSIVE; - return true; - } - - /** - * Represents a queued lock request - */ - private static class LockRequest { - final MonoSink callback; - final LockType lockType; - final Lock lock; - - LockRequest(MonoSink callback, LockType lockType, Lock lock) { - this.callback = callback; - this.lockType = lockType; - this.lock = lock; - } - } - - /** - * A separate thread that calls a MonoSink to continue. This is done after a - * queued lock is granted. - */ - private static class AsynchCallbackExecutor implements Runnable { - private List lockRequestQueue = new LinkedList<>(); - - public AsynchCallbackExecutor() { - Thread thread = new Thread(this); - thread.start(); - } - - public synchronized void addAll(List requests) { - this.lockRequestQueue.addAll(requests); - this.notifyAll(); - } - - @Override - public void run() { - try { - while (true) { - for (LockRequest request : consume()) { - request.callback.success(request.lock); - } - waitForNewEntries(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Interrupted {}", e.getMessage()); - } - } - - private synchronized List consume() { - List q = this.lockRequestQueue; - this.lockRequestQueue = new LinkedList<>(); - return q; - } - - @SuppressWarnings("java:S2274") - private synchronized void waitForNewEntries() throws InterruptedException { - if (this.lockRequestQueue.isEmpty()) { - this.wait(); - } - } - } -}