Simplified startup
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.repository;
22
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import reactor.core.publisher.Mono;
32 import reactor.core.publisher.MonoSink;
33
34 /**
35  * A resource lock. Exclusive means that the caller takes exclusive ownership of
36  * the resurce. Non exclusive lock means that several users can lock the
37  * resource (for shared usage).
38  */
39 public class Lock {
40     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
41
42     private boolean isExclusive = false;
43     private int lockCounter = 0;
44     private final List<LockRequest> lockRequestQueue = new LinkedList<>();
45     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
46
47     private static class AsynchCallbackExecutor implements Runnable {
48         private List<LockRequest> lockRequestQueue = new LinkedList<>();
49
50         public AsynchCallbackExecutor() {
51             Thread thread = new Thread(this);
52             thread.start();
53         }
54
55         public synchronized void addAll(List<LockRequest> requests) {
56             this.lockRequestQueue.addAll(requests);
57             this.notifyAll();
58         }
59
60         @Override
61         public void run() {
62             while (true) {
63                 for (LockRequest request : consume()) {
64                     request.callback.success(request.lock);
65                 }
66                 waitForNewEntries();
67             }
68         }
69
70         private synchronized List<LockRequest> consume() {
71             List<LockRequest> q = this.lockRequestQueue;
72             this.lockRequestQueue = new LinkedList<>();
73             return q;
74         }
75
76         @SuppressWarnings("java:S2274")
77         private synchronized void waitForNewEntries() {
78             try {
79                 if (this.lockRequestQueue.isEmpty()) {
80                     this.wait();
81                 }
82             } catch (InterruptedException e) {
83                 logger.warn("waitForUnlock interrupted", e);
84                 Thread.currentThread().interrupt();
85             }
86         }
87     }
88
89     public enum LockType {
90         EXCLUSIVE, SHARED
91     }
92
93     /** The caller thread will be blocked util the lock is granted. */
94     public synchronized void lockBlocking(LockType locktype) {
95         while (!tryLock(locktype)) {
96             this.waitForUnlock();
97         }
98     }
99
100     /** Reactive version. The Lock will be emitted when the lock is granted */
101     public synchronized Mono<Lock> lock(LockType lockType) {
102         if (tryLock(lockType)) {
103             return Mono.just(this);
104         } else {
105             return Mono.create(monoSink -> addToQueue(monoSink, lockType));
106         }
107     }
108
109     public Mono<Lock> unlock() {
110         return Mono.create(monoSink -> {
111             unlockBlocking();
112             monoSink.success(this);
113         });
114     }
115
116     public void unlockBlocking() {
117         synchronized (this) {
118             if (lockCounter <= 0) {
119                 lockCounter = -1; // Might as well stop, to make it easier to find the problem
120                 throw new NullPointerException("Number of unlocks must match the number of locks");
121             }
122             this.lockCounter--;
123             if (lockCounter == 0) {
124                 isExclusive = false;
125             }
126             this.notifyAll();
127         }
128         this.processQueuedEntries();
129     }
130
131     @Override
132     public String toString() {
133         return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
134     }
135
136     /** returns the current number of granted locks */
137     public synchronized int getLockCounter() {
138         return this.lockCounter;
139     }
140
141     private void processQueuedEntries() {
142         List<LockRequest> granted = new ArrayList<>();
143         synchronized (this) {
144             for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
145                 LockRequest request = i.next();
146                 if (tryLock(request.lockType)) {
147                     i.remove();
148                     granted.add(request);
149                 }
150             }
151         }
152         callbackProcessor.addAll(granted);
153     }
154
155     private static class LockRequest {
156         final MonoSink<Lock> callback;
157         final LockType lockType;
158         final Lock lock;
159
160         LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
161             this.callback = callback;
162             this.lockType = lockType;
163             this.lock = lock;
164         }
165     }
166
167     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
168         lockRequestQueue.add(new LockRequest(callback, lockType, this));
169     }
170
171     @SuppressWarnings("java:S2274")
172     private synchronized void waitForUnlock() {
173         try {
174             this.wait();
175         } catch (InterruptedException e) {
176             logger.warn("waitForUnlock interrupted", e);
177             Thread.currentThread().interrupt();
178         }
179     }
180
181     private boolean tryLock(LockType lockType) {
182         if (this.isExclusive) {
183             return false;
184         }
185         if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
186             return false;
187         }
188         lockCounter++;
189         this.isExclusive = lockType == LockType.EXCLUSIVE;
190         return true;
191     }
192
193 }