716148fde8d14a5a4fb720772ac573a776cb3d36
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.repository;
22
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import reactor.core.publisher.Mono;
32 import reactor.core.publisher.MonoSink;
33
34 /**
35  * A resource lock. Exclusive means that the caller takes exclusive ownership of
36  * the resurce. Non exclusive lock means that several users can lock the
37  * resource (for shared usage).
38  */
39 public class Lock {
40     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
41
42     private boolean isExclusive = false;
43     private int lockCounter = 0;
44     private final List<LockRequest> lockRequestQueue = new LinkedList<>();
45     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
46
47     public enum LockType {
48         EXCLUSIVE, SHARED
49     }
50
51     /** The caller thread will be blocked util the lock is granted. */
52     public synchronized void lockBlocking(LockType locktype) {
53         while (!tryLock(locktype)) {
54             this.waitForUnlock();
55         }
56     }
57
58     /** Reactive version. The Lock will be emitted when the lock is granted */
59     public synchronized Mono<Lock> lock(LockType lockType) {
60         if (tryLock(lockType)) {
61             return Mono.just(this);
62         } else {
63             return Mono.create(monoSink -> addToQueue(monoSink, lockType));
64         }
65     }
66
67     public Mono<Lock> unlock() {
68         return Mono.create(monoSink -> {
69             unlockBlocking();
70             monoSink.success(this);
71         });
72     }
73
74     public synchronized void unlockBlocking() {
75         if (lockCounter <= 0) {
76             lockCounter = -1; // Might as well stop, to make it easier to find the problem
77             logger.error("Number of unlocks must match the number of locks");
78         }
79         this.lockCounter--;
80         if (lockCounter == 0) {
81             isExclusive = false;
82         }
83         this.notifyAll();
84         this.processQueuedEntries();
85     }
86
87     @Override
88     public synchronized String toString() {
89         return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
90             + this.lockRequestQueue.size();
91     }
92
93     /** returns the current number of granted locks */
94     public synchronized int getLockCounter() {
95         return this.lockCounter;
96     }
97
98     private void processQueuedEntries() {
99         List<LockRequest> granted = new ArrayList<>();
100         for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
101             LockRequest request = i.next();
102             if (tryLock(request.lockType)) {
103                 i.remove();
104                 granted.add(request);
105             }
106         }
107         callbackProcessor.addAll(granted);
108     }
109
110     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
111         lockRequestQueue.add(new LockRequest(callback, lockType, this));
112         processQueuedEntries();
113     }
114
115     @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
116     private synchronized void waitForUnlock() {
117         try {
118             this.wait();
119         } catch (InterruptedException e) {
120             logger.warn("waitForUnlock interrupted", e);
121             Thread.currentThread().interrupt();
122         }
123     }
124
125     private boolean tryLock(LockType lockType) {
126         if (this.isExclusive) {
127             return false;
128         }
129         if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
130             return false;
131         }
132         lockCounter++;
133         this.isExclusive = lockType == LockType.EXCLUSIVE;
134         return true;
135     }
136
137     /**
138      * Represents a queued lock request
139      */
140     private static class LockRequest {
141         final MonoSink<Lock> callback;
142         final LockType lockType;
143         final Lock lock;
144
145         LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
146             this.callback = callback;
147             this.lockType = lockType;
148             this.lock = lock;
149         }
150     }
151
152     /**
153      * A separate thread that calls a MonoSink to continue. This is done after a
154      * queued lock is granted.
155      */
156     private static class AsynchCallbackExecutor implements Runnable {
157         private List<LockRequest> lockRequestQueue = new LinkedList<>();
158
159         public AsynchCallbackExecutor() {
160             Thread thread = new Thread(this);
161             thread.start();
162         }
163
164         public synchronized void addAll(List<LockRequest> requests) {
165             this.lockRequestQueue.addAll(requests);
166             this.notifyAll();
167         }
168
169         @Override
170         public void run() {
171             try {
172                 while (true) {
173                     for (LockRequest request : consume()) {
174                         request.callback.success(request.lock);
175                     }
176                     waitForNewEntries();
177                 }
178             } catch (InterruptedException e) {
179                 Thread.currentThread().interrupt();
180                 logger.error("Interrupted {}", e.getMessage());
181             }
182         }
183
184         private synchronized List<LockRequest> consume() {
185             List<LockRequest> q = this.lockRequestQueue;
186             this.lockRequestQueue = new LinkedList<>();
187             return q;
188         }
189
190         @SuppressWarnings("java:S2274")
191         private synchronized void waitForNewEntries() throws InterruptedException {
192             if (this.lockRequestQueue.isEmpty()) {
193                 this.wait();
194             }
195         }
196     }
197 }