2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.repository;
23 import java.util.Iterator;
24 import java.util.LinkedList;
25 import java.util.List;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import reactor.core.publisher.Mono;
31 import reactor.core.publisher.MonoSink;
34 * A resource lock. The caller thread will be blocked until the lock is granted.
35 * Exclusive means that the caller takes exclusive ownership of the resurce. Non
36 * exclusive lock means that several users can lock the resource (for shared
40 private static final Logger logger = LoggerFactory.getLogger(Lock.class);
42 private boolean isExclusive = false;
45 public static enum LockType {
49 public synchronized void lockBlocking(LockType locktype) {
50 while (!tryLock(locktype)) {
55 public synchronized void lockBlocking() {
56 lockBlocking(LockType.SHARED);
59 public synchronized Mono<Lock> lock(LockType lockType) {
60 if (tryLock(lockType)) {
61 return Mono.just(this);
63 return Mono.create(monoSink -> addToQueue(monoSink, lockType));
67 public synchronized void unlock() {
72 cnt = -1; // Might as well stop, to make it easier to find the problem
73 throw new RuntimeException("Number of unlocks must match the number of locks");
79 this.processQueuedEntries();
83 private void processQueuedEntries() {
84 for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
85 QueueEntry e = i.next();
86 if (tryLock(e.lockType)) {
88 e.callback.success(this);
93 static class QueueEntry {
94 final MonoSink<Lock> callback;
95 final LockType lockType;
97 QueueEntry(MonoSink<Lock> callback, LockType lockType) {
98 this.callback = callback;
99 this.lockType = lockType;
103 private final List<QueueEntry> queue = new LinkedList<>();
105 private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
106 queue.add(new QueueEntry(callback, lockType));
109 private void waitForUnlock() {
112 } catch (InterruptedException e) {
113 logger.warn("waitForUnlock interrupted", e);
117 private boolean disable() {
121 private boolean tryLock(LockType lockType) {
125 if (this.isExclusive) {
128 if (lockType == LockType.EXCLUSIVE && cnt > 0) {
132 this.isExclusive = lockType == LockType.EXCLUSIVE;
136 public synchronized int getLockCounter() {