Make naming consistent regarding synchronization
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.repository;
22
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import reactor.core.publisher.Mono;
32 import reactor.core.publisher.MonoSink;
33
34 /**
35  * A resource lock. Exclusive means that the caller takes exclusive ownership of
36  * the resurce. Non exclusive lock means that several users can lock the
37  * resource (for shared usage).
38  */
39 public class Lock {
40     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
41
42     private boolean isExclusive = false;
43     private int lockCounter = 0;
44     private final List<LockRequest> lockRequestQueue = new LinkedList<>();
45
46     private static class AsynchCallbackExecutor implements Runnable {
47         private List<LockRequest> lockRequestQueue = new LinkedList<>();
48
49         public AsynchCallbackExecutor() {
50             Thread thread = new Thread(this);
51             thread.start();
52         }
53
54         public synchronized void addAll(List<LockRequest> requests) {
55             this.lockRequestQueue.addAll(requests);
56             this.notifyAll();
57         }
58
59         @Override
60         public void run() {
61             while (true) {
62                 for (LockRequest request : consume()) {
63                     request.callback.success(request.lock);
64                 }
65                 waitForNewEntries();
66             }
67         }
68
69         private synchronized List<LockRequest> consume() {
70             List<LockRequest> q = this.lockRequestQueue;
71             this.lockRequestQueue = new LinkedList<>();
72             return q;
73         }
74
75         private synchronized void waitForNewEntries() {
76             try {
77                 if (this.lockRequestQueue.isEmpty()) {
78                     this.wait();
79                 }
80             } catch (InterruptedException e) {
81                 logger.warn("waitForUnlock interrupted", e);
82             }
83         }
84     }
85
86     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
87
88     public static enum LockType {
89         EXCLUSIVE, SHARED
90     }
91
92     /** The caller thread will be blocked util the lock is granted. */
93     public synchronized void lockBlocking(LockType locktype) {
94         while (!tryLock(locktype)) {
95             this.waitForUnlock();
96         }
97     }
98
99     /** Reactive version. The Lock will be emitted when the lock is granted */
100     public synchronized Mono<Lock> lock(LockType lockType) {
101         if (tryLock(lockType)) {
102             return Mono.just(this);
103         } else {
104             return Mono.create(monoSink -> addToQueue(monoSink, lockType));
105         }
106     }
107
108     public Mono<Lock> unlock() {
109         return Mono.create(monoSink -> {
110             unlockBlocking();
111             monoSink.success(this);
112         });
113     }
114
115     public void unlockBlocking() {
116         synchronized (this) {
117             if (lockCounter <= 0) {
118                 lockCounter = -1; // Might as well stop, to make it easier to find the problem
119                 throw new RuntimeException("Number of unlocks must match the number of locks");
120             }
121             this.lockCounter--;
122             if (lockCounter == 0) {
123                 isExclusive = false;
124             }
125             this.notifyAll();
126         }
127         this.processQueuedEntries();
128     }
129
130     @Override
131     public String toString() {
132         return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
133     }
134
135     /** returns the current number of granted locks */
136     public synchronized int getLockCounter() {
137         return this.lockCounter;
138     }
139
140     private void processQueuedEntries() {
141         List<LockRequest> granted = new ArrayList<>();
142         synchronized (this) {
143             for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
144                 LockRequest request = i.next();
145                 if (tryLock(request.lockType)) {
146                     i.remove();
147                     granted.add(request);
148                 }
149             }
150         }
151
152         /*
153          * for (LockRequest request : granted) { request.callback.success(this); }
154          */
155         callbackProcessor.addAll(granted);
156     }
157
158     private static class LockRequest {
159         final MonoSink<Lock> callback;
160         final LockType lockType;
161         final Lock lock;
162
163         LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
164             this.callback = callback;
165             this.lockType = lockType;
166             this.lock = lock;
167         }
168     }
169
170     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
171         lockRequestQueue.add(new LockRequest(callback, lockType, this));
172     }
173
174     private void waitForUnlock() {
175         try {
176             this.wait();
177         } catch (InterruptedException e) {
178             logger.warn("waitForUnlock interrupted", e);
179         }
180     }
181
182     private boolean tryLock(LockType lockType) {
183         if (this.isExclusive) {
184             return false;
185         }
186         if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
187             return false;
188         }
189         lockCounter++;
190         this.isExclusive = lockType == LockType.EXCLUSIVE;
191         return true;
192     }
193
194 }