2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.repository;
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import reactor.core.publisher.Mono;
32 import reactor.core.publisher.MonoSink;
35 * A resource lock. Exclusive means that the caller takes exclusive ownership of
36 * the resurce. Non exclusive lock means that several users can lock the
37 * resource (for shared usage).
40 private static final Logger logger = LoggerFactory.getLogger(Lock.class);
42 private boolean isExclusive = false;
43 private int lockCounter = 0;
44 private final List<LockRequest> lockRequestQueue = new LinkedList<>();
45 private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
47 public enum LockType {
51 /** The caller thread will be blocked util the lock is granted. */
52 public synchronized void lockBlocking(LockType locktype) {
53 while (!tryLock(locktype)) {
58 /** Reactive version. The Lock will be emitted when the lock is granted */
59 public synchronized Mono<Lock> lock(LockType lockType) {
60 if (tryLock(lockType)) {
61 return Mono.just(this);
63 return Mono.create(monoSink -> addToQueue(monoSink, lockType));
67 public Mono<Lock> unlock() {
68 return Mono.create(monoSink -> {
70 monoSink.success(this);
74 public synchronized void unlockBlocking() {
75 if (lockCounter <= 0) {
76 lockCounter = -1; // Might as well stop, to make it easier to find the problem
77 logger.error("Number of unlocks must match the number of locks");
80 if (lockCounter == 0) {
84 this.processQueuedEntries();
88 public synchronized String toString() {
89 return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
90 + this.lockRequestQueue.size();
93 /** returns the current number of granted locks */
94 public synchronized int getLockCounter() {
95 return this.lockCounter;
98 private void processQueuedEntries() {
99 List<LockRequest> granted = new ArrayList<>();
100 for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
101 LockRequest request = i.next();
102 if (tryLock(request.lockType)) {
104 granted.add(request);
107 callbackProcessor.addAll(granted);
110 private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
111 lockRequestQueue.add(new LockRequest(callback, lockType, this));
112 processQueuedEntries();
115 @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
116 private synchronized void waitForUnlock() {
119 } catch (InterruptedException e) {
120 logger.warn("waitForUnlock interrupted", e);
121 Thread.currentThread().interrupt();
125 private boolean tryLock(LockType lockType) {
126 if (this.isExclusive) {
129 if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
133 this.isExclusive = lockType == LockType.EXCLUSIVE;
138 * Represents a queued lock request
140 private static class LockRequest {
141 final MonoSink<Lock> callback;
142 final LockType lockType;
145 LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
146 this.callback = callback;
147 this.lockType = lockType;
153 * A separate thread that calls a MonoSink to continue. This is done after a
154 * queued lock is granted.
156 private static class AsynchCallbackExecutor implements Runnable {
157 private List<LockRequest> lockRequestQueue = new LinkedList<>();
159 public AsynchCallbackExecutor() {
160 Thread thread = new Thread(this);
164 public synchronized void addAll(List<LockRequest> requests) {
165 this.lockRequestQueue.addAll(requests);
173 for (LockRequest request : consume()) {
174 request.callback.success(request.lock);
178 } catch (InterruptedException e) {
179 Thread.currentThread().interrupt();
180 logger.error("Interrupted {}", e.getMessage());
184 private synchronized List<LockRequest> consume() {
185 List<LockRequest> q = this.lockRequestQueue;
186 this.lockRequestQueue = new LinkedList<>();
190 @SuppressWarnings("java:S2274")
191 private synchronized void waitForNewEntries() throws InterruptedException {
192 if (this.lockRequestQueue.isEmpty()) {