/*- * ========================LICENSE_START================================= * O-RAN-SC * %% * Copyright (C) 2019 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ========================LICENSE_END=================================== */ package org.oransc.policyagent.repository; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; /** * A resource lock. The caller thread will be blocked until the lock is granted. * Exclusive means that the caller takes exclusive ownership of the resurce. Non * exclusive lock means that several users can lock the resource (for shared * usage). */ public class Lock { private static final Logger logger = LoggerFactory.getLogger(Lock.class); private boolean isExclusive = false; private int cnt = 0; public static enum LockType { EXCLUSIVE, SHARED } public synchronized void lockBlocking(LockType locktype) { while (!tryLock(locktype)) { this.waitForUnlock(); } } public synchronized void lockBlocking() { lockBlocking(LockType.SHARED); } public synchronized Mono lock(LockType lockType) { if (tryLock(lockType)) { return Mono.just(this); } else { return Mono.create(monoSink -> addToQueue(monoSink, lockType)); } } public synchronized void unlock() { if (disable()) { return; } if (cnt <= 0) { cnt = -1; // Might as well stop, to make it easier to find the problem throw new RuntimeException("Number of unlocks must match the number of locks"); } this.cnt--; if (cnt == 0) { isExclusive = false; } this.processQueuedEntries(); this.notifyAll(); } private void processQueuedEntries() { for (Iterator i = queue.iterator(); i.hasNext();) { QueueEntry e = i.next(); if (tryLock(e.lockType)) { i.remove(); e.callback.success(this); } } } static class QueueEntry { final MonoSink callback; final LockType lockType; QueueEntry(MonoSink callback, LockType lockType) { this.callback = callback; this.lockType = lockType; } } private final List queue = new LinkedList<>(); private synchronized void addToQueue(MonoSink callback, LockType lockType) { queue.add(new QueueEntry(callback, lockType)); } private void waitForUnlock() { try { this.wait(); } catch (InterruptedException e) { logger.warn("waitForUnlock interrupted", e); } } private boolean disable() { return true; } private boolean tryLock(LockType lockType) { if (disable()) { return true; } if (this.isExclusive) { return false; } if (lockType == LockType.EXCLUSIVE && cnt > 0) { return false; } cnt++; this.isExclusive = lockType == LockType.EXCLUSIVE; return true; } public synchronized int getLockCounter() { return this.cnt; } }