Concurrent modification exception when allpolicies for a RIC was deleted.
Fixed MockA1Client so it can simulate network delays
Change-Id: I1883b42a7afac303770084625a1e45acbf89c28f
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
import static org.oransc.policyagent.repository.Ric.RicState;
import static org.oransc.policyagent.repository.Ric.RicState;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Vector;
import org.oransc.policyagent.clients.A1Client;
import java.util.Vector;
import org.oransc.policyagent.clients.A1Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
.flatMap(Lock::unlock) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
.flatMapMany(client -> startSynchronization(ric, client)) //
.flatMap(Lock::unlock) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
.flatMapMany(client -> startSynchronization(ric, client)) //
- .subscribe(x -> logger.debug("Synchronize: {}", x), //
- throwable -> onSynchronizationError(ric, throwable), //
- () -> onSynchronizationComplete(ric));
+ .subscribe(new BaseSubscriber<Object>() {
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ startDeleteAllPolicyInstances(ric, throwable);
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ onSynchronizationComplete(ric);
+ }
+ });
}
private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
}
private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
- Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+ Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
}
return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
}
if (service.getCallbackUrl().length() > 0) {
createNotificationClient(url) //
.put("", body) //
if (service.getCallbackUrl().length() > 0) {
createNotificationClient(url) //
.put("", body) //
notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
.warn("Service notification failed for service: {}", service.getName(), throwable),
() -> logger.debug("All services notified"));
notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
.warn("Service notification failed for service: {}", service.getName(), throwable),
() -> logger.debug("All services notified"));
- private void onSynchronizationError(Ric ric, Throwable t) {
+ private void startDeleteAllPolicyInstances(Ric ric, Throwable t) {
logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
// If synchronization fails, try to remove all instances
deleteAllPoliciesInRepository(ric);
logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
// If synchronization fails, try to remove all instances
deleteAllPoliciesInRepository(ric);
Flux.concat(synchronizedTypes, deletePoliciesInRic) //
.subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
Flux.concat(synchronizedTypes, deletePoliciesInRic) //
.subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
- throwable -> onSynchronizationRecoveryError(ric, throwable), //
+ throwable -> onDeleteAllPolicyInstancesError(ric, throwable), //
() -> onSynchronizationComplete(ric));
}
() -> onSynchronizationComplete(ric));
}
- private void onSynchronizationRecoveryError(Ric ric, Throwable t) {
+ private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) {
logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
ric.setState(RicState.UNDEFINED);
}
logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
ric.setState(RicState.UNDEFINED);
}
private void deleteAllPoliciesInRepository(Ric ric) {
synchronized (policies) {
private void deleteAllPoliciesInRepository(Ric ric) {
synchronized (policies) {
- for (Policy policy : policies.getForRic(ric.name())) {
+ List<Policy> ricPolicies = new ArrayList<>(policies.getForRic(ric.name()));
+ for (Policy policy : ricPolicies) {
this.policies.remove(policy);
}
}
}
this.policies.remove(policy);
}
}
}
- private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
+ private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
+ logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name());
+ return a1Client.putPolicy(policy) //
+ .flatMapMany(notUsed -> Flux.just(policy));
+ }
+
+ private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
synchronized (policies) {
return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
synchronized (policies) {
return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
- .doOnNext(
- policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
- .flatMap(a1Client::putPolicy);
+ .flatMap(policy -> putPolicy(policy, ric, a1Client));
package org.oransc.policyagent.utils;
package org.oransc.policyagent.utils;
+import java.time.Duration;
import java.util.List;
import java.util.Vector;
import java.util.List;
import java.util.Vector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
public class MockA1Client implements A1Client {
Policies policies = new Policies();
private final PolicyTypes policyTypes;
public class MockA1Client implements A1Client {
Policies policies = new Policies();
private final PolicyTypes policyTypes;
+ private final Duration asynchDelay;
- public MockA1Client(PolicyTypes policyTypes) {
+ public MockA1Client(PolicyTypes policyTypes, Duration asynchDelay) {
this.policyTypes = policyTypes;
this.policyTypes = policyTypes;
+ this.asynchDelay = asynchDelay;
for (PolicyType p : this.policyTypes.getAll()) {
result.add(p.name());
}
for (PolicyType p : this.policyTypes.getAll()) {
result.add(p.name());
}
- return Mono.just(result);
result.add(policy.id());
}
result.add(policy.id());
}
- return Mono.just(result);
}
}
@Override
public Mono<String> getPolicyTypeSchema(String policyTypeId) {
try {
}
}
@Override
public Mono<String> getPolicyTypeSchema(String policyTypeId) {
try {
- return Mono.just(this.policyTypes.getType(policyTypeId).schema());
+ return mono(this.policyTypes.getType(policyTypeId).schema());
} catch (Exception e) {
return Mono.error(e);
}
} catch (Exception e) {
return Mono.error(e);
}
@Override
public Mono<String> putPolicy(Policy p) {
this.policies.put(p);
@Override
public Mono<String> putPolicy(Policy p) {
this.policies.put(p);
- return Mono.just("OK");
}
@Override
public Mono<String> deletePolicy(Policy policy) {
this.policies.remove(policy);
}
@Override
public Mono<String> deletePolicy(Policy policy) {
this.policies.remove(policy);
- return Mono.just("OK");
}
public Policies getPolicies() {
}
public Policies getPolicies() {
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
- return Mono.just(A1ProtocolType.STD_V1_1);
+ return mono(A1ProtocolType.STD_V1_1);
}
@Override
public Flux<String> deleteAllPolicies() {
this.policies.clear();
}
@Override
public Flux<String> deleteAllPolicies() {
this.policies.clear();
+ return mono("OK") //
+ .flatMapMany(Flux::just);
}
@Override
public Mono<String> getPolicyStatus(Policy policy) {
}
@Override
public Mono<String> getPolicyStatus(Policy policy) {
- return Mono.just("OK");
+ return mono("OK");
+ }
+
+ private <T> Mono<T> mono(T value) {
+ if (this.asynchDelay.isZero()) {
+ return Mono.just(value);
+ } else {
+ return Mono.create(monoSink -> asynchResponse(monoSink, value));
+ }
+ }
+
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ private void sleep() {
+ try {
+ Thread.sleep(this.asynchDelay.toMillis());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private <T> void asynchResponse(MonoSink<T> callback, T str) {
+ Thread thread = new Thread(() -> {
+ sleep(); // Simulate a network delay
+ callback.success(str);
+ });
+ thread.start();
import static org.mockito.Mockito.spy;
import java.lang.invoke.MethodHandles;
import static org.mockito.Mockito.spy;
import java.lang.invoke.MethodHandles;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.HashMap;
import java.util.Map;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, MockA1Client> clients = new HashMap<>();
private final PolicyTypes policyTypes;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, MockA1Client> clients = new HashMap<>();
private final PolicyTypes policyTypes;
+ private Duration asynchDelay = Duration.ofSeconds(0);
public MockA1ClientFactory(PolicyTypes policyTypes) {
super(mock(ApplicationConfig.class));
public MockA1ClientFactory(PolicyTypes policyTypes) {
super(mock(ApplicationConfig.class));
public MockA1Client getOrCreateA1Client(String ricName) {
if (!clients.containsKey(ricName)) {
logger.debug("Creating client for RIC: {}", ricName);
public MockA1Client getOrCreateA1Client(String ricName) {
if (!clients.containsKey(ricName)) {
logger.debug("Creating client for RIC: {}", ricName);
- MockA1Client client = spy(new MockA1Client(policyTypes));
+ MockA1Client client = spy(new MockA1Client(policyTypes, asynchDelay));
clients.put(ricName, client);
}
return clients.get(ricName);
}
clients.put(ricName, client);
}
return clients.get(ricName);
}
+ /**
+ * Simulate network latency. The REST responses will be generated by separate
+ * threads
+ *
+ * @param delay the delay between the request and the response
+ */
+ public void setResponseDelay(Duration delay) {
+ this.asynchDelay = delay;
+ }
+