The resource locking is enabled and reactive.
Change-Id: Ibf83dddaa41cde9224ea52dbfda9dcbc9e4361a6
Issue-ID: NONRTRIC-107
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.ImmutablePolicy;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
Policy policy = policies.get(id);
if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) {
Ric ric = policy.ric();
- return a1ClientFactory.createA1Client(policy.ric()) //
- .doOnNext(notUsed -> ric.getLock().lockBlocking()) //
+ return ric.getLock().lock(LockType.SHARED) // //
+ .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
.doOnNext(notUsed -> policies.remove(policy)) //
.flatMap(client -> client.deletePolicy(policy)) //
- .doOnNext(notUsed -> ric.getLock().unlock()) //
- .doOnError(notUsed -> ric.getLock().unlock()) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(notUsed -> ric.getLock().unlockBlocking()) //
.flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)));
} else if (policy != null) {
return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
final boolean isCreate = this.policies.get(policy.id()) == null;
- return Mono.just(policy) //
- .doOnNext(notUsed -> ric.getLock().lockBlocking()) //
+ return ric.getLock().lock(LockType.SHARED) //
.flatMap(p -> validateModifiedPolicy(policy)) //
.flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
.flatMap(client -> client.putPolicy(policy)) //
.doOnNext(notUsed -> policies.put(policy)) //
- .doOnNext(notUsed -> ric.getLock().unlock()) //
- .doOnError(t -> ric.getLock().unlock()) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(t -> ric.getLock().unlockBlocking()) //
.flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
.onErrorResume(t -> Mono.just(new ResponseEntity<>(t.getMessage(), HttpStatus.METHOD_NOT_ALLOWED)));
}
package org.oransc.policyagent.dmaap;
-import java.util.Optional;
-
import com.google.gson.JsonObject;
+import java.util.Optional;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
package org.oransc.policyagent.repository;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import reactor.core.publisher.MonoSink;
/**
- * A resource lock. The caller thread will be blocked until the lock is granted.
- * Exclusive means that the caller takes exclusive ownership of the resurce. Non
- * exclusive lock means that several users can lock the resource (for shared
- * usage).
+ * A resource lock. Exclusive means that the caller takes exclusive ownership of
+ * the resurce. Non exclusive lock means that several users can lock the
+ * resource (for shared usage).
*/
public class Lock {
private static final Logger logger = LoggerFactory.getLogger(Lock.class);
private boolean isExclusive = false;
- private int cnt = 0;
+ private int lockCounter = 0;
+ private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+ private static class AsynchCallbackExecutor implements Runnable {
+ private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+ public AsynchCallbackExecutor() {
+ Thread thread = new Thread(this);
+ thread.start();
+ }
+
+ public synchronized void addAll(List<LockRequest> requests) {
+ this.lockRequestQueue.addAll(requests);
+ this.notifyAll();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ for (LockRequest request : consume()) {
+ request.callback.success(request.lock);
+ }
+ waitForNewEntries();
+ }
+ }
+
+ private synchronized List<LockRequest> consume() {
+ List<LockRequest> q = this.lockRequestQueue;
+ this.lockRequestQueue = new LinkedList<>();
+ return q;
+ }
+
+ private synchronized void waitForNewEntries() {
+ try {
+ if (this.lockRequestQueue.isEmpty()) {
+ this.wait();
+ }
+ } catch (InterruptedException e) {
+ logger.warn("waitForUnlock interrupted", e);
+ }
+ }
+ }
+
+ private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
public static enum LockType {
EXCLUSIVE, SHARED
}
+ /** The caller thread will be blocked util the lock is granted. */
public synchronized void lockBlocking(LockType locktype) {
while (!tryLock(locktype)) {
this.waitForUnlock();
}
}
- public synchronized void lockBlocking() {
- lockBlocking(LockType.SHARED);
- }
-
+ /** Reactive version. The Lock will be emitted when the lock is granted */
public synchronized Mono<Lock> lock(LockType lockType) {
if (tryLock(lockType)) {
return Mono.just(this);
}
}
- public synchronized void unlock() {
- if (disable()) {
- return;
- }
- if (cnt <= 0) {
- cnt = -1; // Might as well stop, to make it easier to find the problem
- throw new RuntimeException("Number of unlocks must match the number of locks");
- }
- this.cnt--;
- if (cnt == 0) {
- isExclusive = false;
+ public Mono<Lock> unlock() {
+ return Mono.create(monoSink -> {
+ unlockBlocking();
+ monoSink.success(this);
+ });
+ }
+
+ public void unlockBlocking() {
+ synchronized (this) {
+ if (lockCounter <= 0) {
+ lockCounter = -1; // Might as well stop, to make it easier to find the problem
+ throw new RuntimeException("Number of unlocks must match the number of locks");
+ }
+ this.lockCounter--;
+ if (lockCounter == 0) {
+ isExclusive = false;
+ }
+ this.notifyAll();
}
this.processQueuedEntries();
- this.notifyAll();
+ }
+
+ @Override
+ public String toString() {
+ return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
+ }
+
+ /** returns the current number of granted locks */
+ public synchronized int getLockCounter() {
+ return this.lockCounter;
}
private void processQueuedEntries() {
- for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
- QueueEntry e = i.next();
- if (tryLock(e.lockType)) {
- i.remove();
- e.callback.success(this);
+ List<LockRequest> granted = new ArrayList<>();
+ synchronized (this) {
+ for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
+ LockRequest request = i.next();
+ if (tryLock(request.lockType)) {
+ i.remove();
+ granted.add(request);
+ }
}
}
+
+ /*
+ * for (LockRequest request : granted) { request.callback.success(this); }
+ */
+ callbackProcessor.addAll(granted);
}
- static class QueueEntry {
+ private static class LockRequest {
final MonoSink<Lock> callback;
final LockType lockType;
+ final Lock lock;
- QueueEntry(MonoSink<Lock> callback, LockType lockType) {
+ LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
this.callback = callback;
this.lockType = lockType;
+ this.lock = lock;
}
}
- private final List<QueueEntry> queue = new LinkedList<>();
-
private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
- queue.add(new QueueEntry(callback, lockType));
+ lockRequestQueue.add(new LockRequest(callback, lockType, this));
}
private void waitForUnlock() {
}
}
- private boolean disable() {
- return true;
- }
-
private boolean tryLock(LockType lockType) {
- if (disable()) {
- return true;
- }
if (this.isExclusive) {
return false;
}
- if (lockType == LockType.EXCLUSIVE && cnt > 0) {
+ if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
return false;
}
- cnt++;
+ lockCounter++;
this.isExclusive = lockType == LockType.EXCLUSIVE;
return true;
}
- public synchronized int getLockCounter() {
- return this.cnt;
- }
-
}
private final Lock lock = new Lock();
/**
- * Creates the Ric. Initial state is {@link RicState.NOT_INITIATED}.
+ * Creates the Ric. Initial state is {@link RicState.UNDEFINED}.
*
* @param ricConfig The {@link RicConfig} for this Ric.
*/
synchronized (this.rics) {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkRicState) //
- .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
- .flatMap(this::checkRicPolicies) //
- .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
- .flatMap(this::checkRicPolicyTypes); //
+ .flatMap(this::checkOneRic) //
+ .onErrorResume(throwable -> Mono.empty());
}
}
+ private Mono<RicData> checkOneRic(RicData ricData) {
+ return checkRicState(ricData) //
+ .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+ .flatMap(x -> checkRicPolicies(ricData)) //
+ .flatMap(x -> ricData.ric.getLock().unlock()) //
+ .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
+ .flatMap(x -> checkRicPolicyTypes(ricData)); //
+ }
+
private static class RicData {
RicData(Ric ric, A1Client a1Client) {
this.ric = ric;
private Mono<RicData> checkRicState(RicData ric) {
if (ric.ric.getState() == RicState.UNDEFINED) {
- return startSynchronization(ric);
+ return startSynchronization(ric) //
+ .onErrorResume(t -> Mono.empty());
} else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
return Mono.empty();
} else {
private Mono<RicData> checkRicPolicies(RicData ric) {
return ric.a1Client.getPolicyIdentities() //
- .onErrorResume(t -> {
- ric.ric.getLock().unlock();
- return Mono.empty();
- }) //
.flatMap(ricP -> validateInstances(ricP, ric));
}
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
- ric.ric.getLock().unlock();
return startSynchronization(ric);
}
for (String policyId : ricPolicies) {
if (!policies.containsPolicy(policyId)) {
- ric.ric.getLock().unlock();
return startSynchronization(ric);
}
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
return ric.a1Client.getPolicyTypeIdentities() //
- .onErrorResume(notUsed -> Mono.empty()) //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask recovery = createSynchronizationTask();
recovery.run(ric.ric);
- return Mono.empty();
+ return Mono.error(new Exception("Syncronization started"));
}
@SuppressWarnings("squid:S2629")
}
ric.setState(RicState.SYNCHRONIZING);
}
- ric.getLock().lockBlocking(LockType.EXCLUSIVE); // Make sure no NBI updates are running
- ric.getLock().unlock();
- this.a1ClientFactory.createA1Client(ric)//
+
+ ric.getLock().lock(LockType.EXCLUSIVE) // Make sure no NBI updates are running
+ .flatMap(lock -> lock.unlock()) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
.flatMapMany(client -> startSynchronization(ric, client)) //
.subscribe(x -> logger.debug("Synchronize: {}", x), //
throwable -> onSynchronizationError(ric, throwable), //
Services services;
private static Gson gson = new GsonBuilder() //
- .serializeNulls() //
- .create(); //
+ .serializeNulls() //
+ .create(); //
public static class MockApplicationConfig extends ApplicationConfig {
@Override
@Override
public boolean hasError(ClientHttpResponse httpResponse) throws IOException {
return (httpResponse.getStatusCode().series() == Series.CLIENT_ERROR
- || httpResponse.getStatusCode().series() == Series.SERVER_ERROR);
+ || httpResponse.getStatusCode().series() == Series.SERVER_ERROR);
}
@Override
public void verifyNoRicLocks() {
for (Ric ric : this.rics.getRics()) {
ric.getLock().lockBlocking(LockType.EXCLUSIVE);
- ric.getLock().unlock();
+ ric.getLock().unlockBlocking();
assertThat(ric.getLock().getLockCounter()).isEqualTo(0);
assertThat(ric.getState()).isEqualTo(Ric.RicState.IDLE);
}
@Test
public void testGetRicForManagedElement_thenReturnCorrectRic() throws Exception {
- addRic("notCorrectRic1");
- addRic("notCorrectRic2");
- addRic("notCorrectRic3");
- addRic("notCorrectRic4");
- addRic("notCorrectRic5");
- addRic("notCorrectRic6");
-
String ricName = "ric1";
Ric ric = addRic(ricName);
String managedElementId = "kista_1";
addPolicyType(policyTypeName, ricName);
String url = baseUrl() + "/policy?type=" + policyTypeName + "&instance=" + policyInstanceId + "&ric=" + ricName
- + "&service=" + serviceName;
+ + "&service=" + serviceName;
final String json = jsonString();
this.rics.getRic(ricName).setState(Ric.RicState.IDLE);
private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
addRic(ric);
Policy p = ImmutablePolicy.builder().id(id) //
- .json(jsonString()) //
- .ownerServiceName(service) //
- .ric(rics.getRic(ric)) //
- .type(addPolicyType(typeName, ric)) //
- .lastModified("lastModified").build();
+ .json(jsonString()) //
+ .ownerServiceName(service) //
+ .ric(rics.getRic(ric)) //
+ .type(addPolicyType(typeName, ric)) //
+ .lastModified("lastModified").build();
policies.put(p);
return p;
}
private PolicyType addPolicyType(String policyTypeName, String ricName) {
PolicyType type = ImmutablePolicyType.builder() //
- .name(policyTypeName) //
- .schema("{\"title\":\"" + policyTypeName + "\"}") //
- .build();
+ .name(policyTypeName) //
+ .schema("{\"title\":\"" + policyTypeName + "\"}") //
+ .build();
policyTypes.put(type);
addRic(ricName).addSupportedPolicyType(type);
}
Vector<String> mes = new Vector<>();
RicConfig conf = ImmutableRicConfig.builder() //
- .name(ricName) //
- .baseUrl(ricName) //
- .managedElementIds(mes) //
- .build();
+ .name(ricName) //
+ .baseUrl(ricName) //
+ .managedElementIds(mes) //
+ .build();
Ric ric = new Ric(conf);
ric.setState(Ric.RicState.IDLE);
this.rics.put(ric);
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.repository;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.Lock.LockType;
+
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+@ExtendWith(MockitoExtension.class)
+public class LockTest {
+
+ private void sleep() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ private void asynchUnlock(Lock lock) {
+ Thread t = new Thread(() -> {
+ sleep();
+ lock.unlockBlocking();
+ });
+ t.start();
+ }
+
+ @Test
+ public void testLock() throws IOException, ServiceException {
+ Lock lock = new Lock();
+ lock.lockBlocking(LockType.SHARED);
+ lock.unlockBlocking();
+
+ lock.lockBlocking(LockType.EXCLUSIVE);
+ asynchUnlock(lock);
+
+ lock.lockBlocking(LockType.SHARED);
+ lock.unlockBlocking();
+
+ assertThat(lock.getLockCounter()).isEqualTo(0);
+ }
+
+ @Test
+ public void testReactiveLock() {
+ Lock lock = new Lock();
+
+ Mono<Lock> seq = lock.lock(LockType.EXCLUSIVE) //
+ .doOnNext(l -> System.out.println("1 " + l)) //
+ .flatMap(l -> lock.lock(LockType.EXCLUSIVE)) //
+ .flatMap(l -> lock.unlock()) //
+ .doOnNext(l -> System.out.println("2 " + l)); //
+
+ asynchUnlock(lock);
+ StepVerifier.create(seq) //
+ .expectSubscription() //
+ .expectNext(lock) //
+ .verifyComplete();
+
+ assertThat(lock.getLockCounter()).isEqualTo(0);
+
+ }
+
+}
package org.oransc.policyagent.tasks;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.List;
import java.util.Vector;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.repository.ImmutablePolicy;
import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
.build();
private static final Ric RIC_1 = new Ric(ImmutableRicConfig.builder() //
- .name("ric1") //
+ .name("RIC_1") //
.baseUrl("baseUrl1") //
.managedElementIds(new Vector<String>(Arrays.asList("kista_1", "kista_2"))) //
.build());
@Mock
private RicSynchronizationTask recoveryTaskMock;
- private PolicyTypes types;
- private Policies policies;
- private Rics rics;
+ private final PolicyTypes types = new PolicyTypes();
+ private Policies policies = new Policies();
+ private Rics rics = new Rics();
@BeforeEach
public void init() {
doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
- types = new PolicyTypes();
- policies = new Policies();
- rics = new Rics();
+ types.clear();
+ policies.clear();
+ rics.clear();
RIC_1.setState(RicState.UNDEFINED);
RIC_1.clearSupportedPolicyTypes();
}
+ @AfterEach
+ public void verifyNoRicLocks() {
+ for (Ric ric : this.rics.getRics()) {
+ ric.getLock().lockBlocking(LockType.EXCLUSIVE);
+ ric.getLock().unlockBlocking();
+ assertThat(ric.getLock().getLockCounter()).isEqualTo(0);
+ }
+ }
+
@Test
public void whenRicIdleAndNoChangedPoliciesOrPolicyTypes_thenNoRecovery() {
RIC_1.setState(RicState.IDLE);