Bugfix, only one RIC was synched
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.repository;
22
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import reactor.core.publisher.Mono;
32 import reactor.core.publisher.MonoSink;
33
34 /**
35  * A resource lock. Exclusive means that the caller takes exclusive ownership of
36  * the resurce. Non exclusive lock means that several users can lock the
37  * resource (for shared usage).
38  */
39 public class Lock {
40     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
41
42     private boolean isExclusive = false;
43     private int lockCounter = 0;
44     private final List<LockRequest> lockRequestQueue = new LinkedList<>();
45     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
46
47     private static class AsynchCallbackExecutor implements Runnable {
48         private List<LockRequest> lockRequestQueue = new LinkedList<>();
49
50         public AsynchCallbackExecutor() {
51             Thread thread = new Thread(this);
52             thread.start();
53         }
54
55         public synchronized void addAll(List<LockRequest> requests) {
56             this.lockRequestQueue.addAll(requests);
57             this.notifyAll();
58         }
59
60         @Override
61         public void run() {
62             try {
63                 while (true) {
64                     for (LockRequest request : consume()) {
65                         request.callback.success(request.lock);
66                     }
67                     waitForNewEntries();
68                 }
69             } catch (InterruptedException e) {
70                 Thread.currentThread().interrupt();
71                 logger.error("Interrupted {}", e.getMessage());
72             }
73         }
74
75         private synchronized List<LockRequest> consume() {
76             List<LockRequest> q = this.lockRequestQueue;
77             this.lockRequestQueue = new LinkedList<>();
78             return q;
79         }
80
81         @SuppressWarnings("java:S2274")
82         private synchronized void waitForNewEntries() throws InterruptedException {
83             if (this.lockRequestQueue.isEmpty()) {
84                 this.wait();
85             }
86         }
87     }
88
89     public enum LockType {
90         EXCLUSIVE, SHARED
91     }
92
93     /** The caller thread will be blocked util the lock is granted. */
94     public synchronized void lockBlocking(LockType locktype) {
95         while (!tryLock(locktype)) {
96             this.waitForUnlock();
97         }
98     }
99
100     /** Reactive version. The Lock will be emitted when the lock is granted */
101     public synchronized Mono<Lock> lock(LockType lockType) {
102         if (tryLock(lockType)) {
103             return Mono.just(this);
104         } else {
105             return Mono.create(monoSink -> addToQueue(monoSink, lockType));
106         }
107     }
108
109     public Mono<Lock> unlock() {
110         return Mono.create(monoSink -> {
111             unlockBlocking();
112             monoSink.success(this);
113         });
114     }
115
116     public void unlockBlocking() {
117         synchronized (this) {
118             if (lockCounter <= 0) {
119                 lockCounter = -1; // Might as well stop, to make it easier to find the problem
120                 logger.error("Number of unlocks must match the number of locks");
121             }
122             this.lockCounter--;
123             if (lockCounter == 0) {
124                 isExclusive = false;
125             }
126             this.notifyAll();
127         }
128         this.processQueuedEntries();
129     }
130
131     @Override
132     public String toString() {
133         return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
134             + this.lockRequestQueue.size();
135     }
136
137     /** returns the current number of granted locks */
138     public synchronized int getLockCounter() {
139         return this.lockCounter;
140     }
141
142     private void processQueuedEntries() {
143         List<LockRequest> granted = new ArrayList<>();
144         synchronized (this) {
145             for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
146                 LockRequest request = i.next();
147                 if (tryLock(request.lockType)) {
148                     i.remove();
149                     granted.add(request);
150                 }
151             }
152         }
153         callbackProcessor.addAll(granted);
154     }
155
156     private static class LockRequest {
157         final MonoSink<Lock> callback;
158         final LockType lockType;
159         final Lock lock;
160
161         LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
162             this.callback = callback;
163             this.lockType = lockType;
164             this.lock = lock;
165         }
166     }
167
168     private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
169         lockRequestQueue.add(new LockRequest(callback, lockType, this));
170     }
171
172     @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
173     private synchronized void waitForUnlock() {
174         try {
175             this.wait();
176         } catch (InterruptedException e) {
177             logger.warn("waitForUnlock interrupted", e);
178             Thread.currentThread().interrupt();
179         }
180     }
181
182     private boolean tryLock(LockType lockType) {
183         if (this.isExclusive) {
184             return false;
185         }
186         if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
187             return false;
188         }
189         lockCounter++;
190         this.isExclusive = lockType == LockType.EXCLUSIVE;
191         return true;
192     }
193
194 }