Adding ONAP PMS as a submodule.
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / repository / Lock.java
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
deleted file mode 100644 (file)
index 716148f..0000000
+++ /dev/null
@@ -1,197 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.repository;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoSink;
-
-/**
- * A resource lock. Exclusive means that the caller takes exclusive ownership of
- * the resurce. Non exclusive lock means that several users can lock the
- * resource (for shared usage).
- */
-public class Lock {
-    private static final Logger logger = LoggerFactory.getLogger(Lock.class);
-
-    private boolean isExclusive = false;
-    private int lockCounter = 0;
-    private final List<LockRequest> lockRequestQueue = new LinkedList<>();
-    private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
-
-    public enum LockType {
-        EXCLUSIVE, SHARED
-    }
-
-    /** The caller thread will be blocked util the lock is granted. */
-    public synchronized void lockBlocking(LockType locktype) {
-        while (!tryLock(locktype)) {
-            this.waitForUnlock();
-        }
-    }
-
-    /** Reactive version. The Lock will be emitted when the lock is granted */
-    public synchronized Mono<Lock> lock(LockType lockType) {
-        if (tryLock(lockType)) {
-            return Mono.just(this);
-        } else {
-            return Mono.create(monoSink -> addToQueue(monoSink, lockType));
-        }
-    }
-
-    public Mono<Lock> unlock() {
-        return Mono.create(monoSink -> {
-            unlockBlocking();
-            monoSink.success(this);
-        });
-    }
-
-    public synchronized void unlockBlocking() {
-        if (lockCounter <= 0) {
-            lockCounter = -1; // Might as well stop, to make it easier to find the problem
-            logger.error("Number of unlocks must match the number of locks");
-        }
-        this.lockCounter--;
-        if (lockCounter == 0) {
-            isExclusive = false;
-        }
-        this.notifyAll();
-        this.processQueuedEntries();
-    }
-
-    @Override
-    public synchronized String toString() {
-        return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
-            + this.lockRequestQueue.size();
-    }
-
-    /** returns the current number of granted locks */
-    public synchronized int getLockCounter() {
-        return this.lockCounter;
-    }
-
-    private void processQueuedEntries() {
-        List<LockRequest> granted = new ArrayList<>();
-        for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
-            LockRequest request = i.next();
-            if (tryLock(request.lockType)) {
-                i.remove();
-                granted.add(request);
-            }
-        }
-        callbackProcessor.addAll(granted);
-    }
-
-    private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
-        lockRequestQueue.add(new LockRequest(callback, lockType, this));
-        processQueuedEntries();
-    }
-
-    @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
-    private synchronized void waitForUnlock() {
-        try {
-            this.wait();
-        } catch (InterruptedException e) {
-            logger.warn("waitForUnlock interrupted", e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private boolean tryLock(LockType lockType) {
-        if (this.isExclusive) {
-            return false;
-        }
-        if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
-            return false;
-        }
-        lockCounter++;
-        this.isExclusive = lockType == LockType.EXCLUSIVE;
-        return true;
-    }
-
-    /**
-     * Represents a queued lock request
-     */
-    private static class LockRequest {
-        final MonoSink<Lock> callback;
-        final LockType lockType;
-        final Lock lock;
-
-        LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
-            this.callback = callback;
-            this.lockType = lockType;
-            this.lock = lock;
-        }
-    }
-
-    /**
-     * A separate thread that calls a MonoSink to continue. This is done after a
-     * queued lock is granted.
-     */
-    private static class AsynchCallbackExecutor implements Runnable {
-        private List<LockRequest> lockRequestQueue = new LinkedList<>();
-
-        public AsynchCallbackExecutor() {
-            Thread thread = new Thread(this);
-            thread.start();
-        }
-
-        public synchronized void addAll(List<LockRequest> requests) {
-            this.lockRequestQueue.addAll(requests);
-            this.notifyAll();
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    for (LockRequest request : consume()) {
-                        request.callback.success(request.lock);
-                    }
-                    waitForNewEntries();
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                logger.error("Interrupted {}", e.getMessage());
-            }
-        }
-
-        private synchronized List<LockRequest> consume() {
-            List<LockRequest> q = this.lockRequestQueue;
-            this.lockRequestQueue = new LinkedList<>();
-            return q;
-        }
-
-        @SuppressWarnings("java:S2274")
-        private synchronized void waitForNewEntries() throws InterruptedException {
-            if (this.lockRequestQueue.isEmpty()) {
-                this.wait();
-            }
-        }
-    }
-}