Fixed concurrency problems
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.repository;
22
23 import java.util.Iterator;
24 import java.util.LinkedList;
25 import java.util.List;
26
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import reactor.core.publisher.Mono;
31 import reactor.core.publisher.MonoSink;
32
33 /**
34  * A resource lock. The caller thread will be blocked until the lock is granted.
35  * Exclusive means that the caller takes exclusive ownership of the resurce. Non
36  * exclusive lock means that several users can lock the resource (for shared
37  * usage).
38  */
39 public class Lock {
40     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
41
42     private boolean isExclusive = false;
43     private int cnt = 0;
44
45     public static enum LockType {
46         EXCLUSIVE, SHARED
47     }
48
49     public synchronized void lockBlocking(LockType locktype) {
50         while (!tryLock(locktype)) {
51             this.waitForUnlock();
52         }
53     }
54
55     public synchronized void lockBlocking() {
56         lockBlocking(LockType.SHARED);
57     }
58
59     public synchronized Mono<Lock> lock(LockType lockType) {
60         if (tryLock(lockType)) {
61             return Mono.just(this);
62         } else {
63             return Mono.create(monoSink -> addToQueue(monoSink, lockType));
64         }
65     }
66
67     public synchronized void unlock() {
68         if (disable()) {
69             return;
70         }
71         if (cnt <= 0) {
72             cnt = -1; // Might as well stop, to make it easier to find the problem
73             throw new RuntimeException("Number of unlocks must match the number of locks");
74         }
75         this.cnt--;
76         if (cnt == 0) {
77             isExclusive = false;
78         }
79         this.processQueuedEntries();
80         this.notifyAll();
81     }
82
83     private void processQueuedEntries() {
84         for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
85             QueueEntry e = i.next();
86             if (tryLock(e.lockType)) {
87                 i.remove();
88                 e.callback.success(this);
89             }
90         }
91     }
92
93     static class QueueEntry {
94         final MonoSink<Lock> callback;
95         final LockType lockType;
96
97         QueueEntry(MonoSink<Lock> callback, LockType lockType) {
98             this.callback = callback;
99             this.lockType = lockType;
100         }
101     }
102
103     private final List<QueueEntry> queue = new LinkedList<>();
104
105     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
106         queue.add(new QueueEntry(callback, lockType));
107     }
108
109     private void waitForUnlock() {
110         try {
111             this.wait();
112         } catch (InterruptedException e) {
113             logger.warn("waitForUnlock interrupted", e);
114         }
115     }
116
117     private boolean disable() {
118         return true;
119     }
120
121     private boolean tryLock(LockType lockType) {
122         if (disable()) {
123             return true;
124         }
125         if (this.isExclusive) {
126             return false;
127         }
128         if (lockType == LockType.EXCLUSIVE && cnt > 0) {
129             return false;
130         }
131         cnt++;
132         this.isExclusive = lockType == LockType.EXCLUSIVE;
133         return true;
134     }
135
136     public synchronized int getLockCounter() {
137         return this.cnt;
138     }
139
140 }