@ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List"), //
@ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
- synchronized (this.policyTypes) {
- if (ricName == null) {
- Collection<PolicyType> types = this.policyTypes.getAll();
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
+ return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
- } else {
- try {
- Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
- return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
- }
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
}
}
}
responseContainer = "List"),
@ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
public ResponseEntity<String> getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) {
- synchronized (this.policyTypes) {
- if (ricName == null) {
- Collection<PolicyType> types = this.policyTypes.getAll();
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
+ return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
- } else {
- try {
- Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
- return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
- }
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
}
}
}
value = { //
@ApiResponse(code = 204, message = "Policy deleted", response = Object.class),
@ApiResponse(code = 404, message = "Policy is not found", response = String.class),
- @ApiResponse(code = 423, message = "RIC is locked", response = String.class)})
+ @ApiResponse(code = 423, message = "RIC is not operational", response = String.class)})
public Mono<ResponseEntity<Object>> deletePolicy( //
@RequestParam(name = "instance", required = true) String id) {
- Policy policy;
try {
- policy = policies.getPolicy(id);
+ Policy policy = policies.getPolicy(id);
keepServiceAlive(policy.ownerServiceName());
- if (policy.ric().getState() != Ric.RicState.IDLE) {
- return Mono.just(new ResponseEntity<>("Busy, synchronizing", HttpStatus.LOCKED));
- }
Ric ric = policy.ric();
- return ric.getLock().lock(LockType.SHARED) // //
- .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(notUsed -> assertRicStateIdle(ric)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(policy.ric())) //
.doOnNext(notUsed -> policies.remove(policy)) //
.flatMap(client -> client.deletePolicy(policy)) //
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
value = { //
@ApiResponse(code = 201, message = "Policy created", response = Object.class), //
@ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
- @ApiResponse(code = 423, message = "RIC is locked", response = String.class), //
+ @ApiResponse(code = 423, message = "RIC is not operational", response = String.class), //
@ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class) //
})
public Mono<ResponseEntity<Object>> putPolicy( //
Ric ric = rics.get(ricName);
PolicyType type = policyTypes.get(typeName);
keepServiceAlive(service);
- if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) {
- Policy policy = ImmutablePolicy.builder() //
- .id(instanceId) //
- .json(jsonString) //
- .type(type) //
- .ric(ric) //
- .ownerServiceName(service) //
- .lastModified(getTimeStampUtc()) //
- .build();
-
- final boolean isCreate = this.policies.get(policy.id()) == null;
-
- return ric.getLock().lock(LockType.SHARED) //
- .flatMap(p -> validateModifiedPolicy(policy)) //
- .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
- .flatMap(client -> client.putPolicy(policy)) //
- .doOnNext(notUsed -> policies.put(policy)) //
- .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
- .doOnError(t -> ric.getLock().unlockBlocking()) //
- .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
- .onErrorResume(this::handleException);
+ if (ric == null || type == null) {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
-
- return ric == null || type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND))
- : Mono.just(new ResponseEntity<>(HttpStatus.LOCKED)); // Synchronizing
+ Policy policy = ImmutablePolicy.builder() //
+ .id(instanceId) //
+ .json(jsonString) //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName(service) //
+ .lastModified(getTimeStampUtc()) //
+ .build();
+
+ final boolean isCreate = this.policies.get(policy.id()) == null;
+
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(p -> assertRicStateIdle(ric)) //
+ .flatMap(p -> validateModifiedPolicy(policy)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
+ .flatMap(client -> client.putPolicy(policy)) //
+ .doOnNext(notUsed -> policies.put(policy)) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(t -> ric.getLock().unlockBlocking()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .onErrorResume(this::handleException);
}
@SuppressWarnings({"unchecked"})
return Mono.just("OK");
}
+ private Mono<Object> assertRicStateIdle(Ric ric) {
+ if (ric.getState() == Ric.RicState.IDLE) {
+ return Mono.just("OK");
+ } else {
+ RejectionException e = new RejectionException(
+ "Ric is not operational, RIC name: " + ric.name() + ", state: " + ric.getState(), HttpStatus.LOCKED);
+ return Mono.error(e);
+ }
+ }
+
@GetMapping("/policies")
@ApiOperation(value = "Query policies")
@ApiResponses(
if ((ric != null && this.rics.get(ric) == null)) {
return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
}
- synchronized (policies) {
- String filteredPolicies = policiesToJson(filter(type, ric, service));
- return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
- }
+
+ String filteredPolicies = policiesToJson(filter(type, ric, service));
+ return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
}
@GetMapping("/policy_ids")
if ((ric != null && this.rics.get(ric) == null)) {
return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
}
- synchronized (policies) {
- String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
- return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
- }
+
+ String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
+ return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
}
@GetMapping("/policy_status")
}
private Collection<Policy> filter(String type, String ric, String service) {
- synchronized (policies) {
- if (type != null) {
- return filter(policies.getForType(type), null, ric, service);
- } else if (service != null) {
- return filter(policies.getForService(service), type, ric, null);
- } else if (ric != null) {
- return filter(policies.getForRic(ric), type, null, service);
- } else {
- return policies.getAll();
- }
+ if (type != null) {
+ return filter(policies.getForType(type), null, ric, service);
+ } else if (service != null) {
+ return filter(policies.getForService(service), type, ric, null);
+ } else if (ric != null) {
+ return filter(policies.getForRic(ric), type, null, service);
+ } else {
+ return policies.getAll();
}
}
}
List<RicInfo> result = new ArrayList<>();
- synchronized (rics) {
- for (Ric ric : rics.getRics()) {
- if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
- result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames()));
- }
+ for (Ric ric : rics.getRics()) {
+ if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
+ result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames()));
}
}
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.Policies;
}
Collection<ServiceStatus> servicesStatus = new ArrayList<>();
- synchronized (this.services) {
- for (Service s : this.services.getAll()) {
- if (name == null || name.equals(s.getName())) {
- servicesStatus.add(toServiceStatus(s));
- }
+ for (Service s : this.services.getAll()) {
+ if (name == null || name.equals(s.getName())) {
+ servicesStatus.add(toServiceStatus(s));
}
}
}
private Service removeService(String name) throws ServiceException {
- synchronized (this.services) {
- Service service = this.services.getService(name);
- this.services.remove(service.getName());
- return service;
- }
+ Service service = this.services.getService(name); // Just to verify that it exists
+ this.services.remove(service.getName());
+ return service;
}
private void removePolicies(Service service) {
- synchronized (this.policies) {
- List<Policy> policyList = new ArrayList<>(this.policies.getForService(service.getName()));
- for (Policy policy : policyList) {
- this.policies.remove(policy);
- }
+ Collection<Policy> policyList = this.policies.getForService(service.getName());
+ for (Policy policy : policyList) {
+ this.policies.remove(policy);
}
}
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
if (map == null) {
return Collections.emptyList();
}
- return Collections.unmodifiableCollection(map.values());
+ return new Vector<>(map.values());
}
public synchronized boolean containsPolicy(String id) {
}
public synchronized Collection<Policy> getAll() {
- return Collections.unmodifiableCollection(policiesId.values());
+ return new Vector<>(policiesId.values());
}
public synchronized Collection<Policy> getForService(String service) {
package org.oransc.policyagent.repository;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
}
public synchronized Collection<PolicyType> getAll() {
- return Collections.unmodifiableCollection(types.values());
+ return new Vector<>(types.values());
}
public synchronized int size() {
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
}
public synchronized Iterable<Ric> getRics() {
- return registeredRics.values();
+ return new Vector<>(registeredRics.values());
}
public synchronized Ric getRic(String name) throws ServiceException {
package org.oransc.policyagent.repository;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
import org.slf4j.Logger;
}
public synchronized Iterable<Service> getAll() {
- return Collections.unmodifiableCollection(registeredServices.values());
+ return new Vector<>(registeredServices.values());
}
public synchronized void remove(String name) {
}
private Flux<RicData> createTask() {
- synchronized (this.rics) {
- return Flux.fromIterable(rics.getRics()) //
- .flatMap(this::createRicData) //
- .flatMap(this::checkOneRic) //
- .onErrorResume(throwable -> Mono.empty());
- }
+ return Flux.fromIterable(rics.getRics()) //
+ .flatMap(this::createRicData) //
+ .flatMap(this::checkOneRic) //
+ .onErrorResume(throwable -> Mono.empty());
+
}
private Mono<RicData> checkOneRic(RicData ricData) {
import static org.oransc.policyagent.repository.Ric.RicState;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.Lock;
import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
/**
* Synchronizes the content of a RIC with the content in the repository. This
this.services = services;
}
- @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
public void run(Ric ric) {
logger.debug("Handling ric: {}", ric.getConfig().name());
- synchronized (ric) {
- if (ric.getState() == RicState.SYNCHRONIZING) {
- logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
- return;
- }
- ric.setState(RicState.SYNCHRONIZING);
+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return;
}
- ric.getLock().lock(LockType.EXCLUSIVE) // Make sure no NBI updates are running
- .flatMap(Lock::unlock) //
+ ric.getLock().lock(LockType.EXCLUSIVE) //
+ .flatMap(notUsed -> setRicState(ric)) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMapMany(client -> startSynchronization(ric, client)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
.subscribe(new BaseSubscriber<Object>() {
@Override
protected void hookOnError(Throwable throwable) {
- startDeleteAllPolicyInstances(ric, throwable);
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+ ric.setState(RicState.UNDEFINED);
}
@Override
protected void hookOnComplete() {
onSynchronizationComplete(ric);
}
+
+ @Override
+ protected void hookFinally(SignalType type) {
+ ric.getLock().unlockBlocking();
+ }
});
}
- private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<Ric> setRicState(Ric ric) {
+ synchronized (ric) {
+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return Mono.empty();
+ }
+ ric.setState(RicState.SYNCHRONIZING);
+ return Mono.just(ric);
+ }
+ }
+
+ private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
}
private void onSynchronizationComplete(Ric ric) {
- logger.info("Synchronization completed for: {}", ric.name());
+ logger.debug("Synchronization completed for: {}", ric.name());
ric.setState(RicState.IDLE);
notifyAllServices("Synchronization completed for:" + ric.name());
}
private void notifyAllServices(String body) {
- synchronized (services) {
- for (Service service : services.getAll()) {
- String url = service.getCallbackUrl();
- if (service.getCallbackUrl().length() > 0) {
- createNotificationClient(url) //
- .put("", body) //
- .subscribe( //
- notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
- .warn("Service notification failed for service: {}", service.getName(), throwable),
- () -> logger.debug("All services notified"));
- }
+ for (Service service : services.getAll()) {
+ String url = service.getCallbackUrl();
+ if (service.getCallbackUrl().length() > 0) {
+ createNotificationClient(url) //
+ .put("", body) //
+ .subscribe( //
+ notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
+ .warn("Service notification failed for service: {}", service.getName(), throwable),
+ () -> logger.debug("All services notified"));
}
}
}
- private void startDeleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
- // If synchronization fails, try to remove all instances
+ private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
+ logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage());
deleteAllPoliciesInRepository(ric);
Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
.flatMapMany(A1Client::deleteAllPolicies) //
.doOnComplete(() -> deleteAllPoliciesInRepository(ric));
- Flux.concat(synchronizedTypes, deletePoliciesInRic) //
- .subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
- throwable -> onDeleteAllPolicyInstancesError(ric, throwable), //
- () -> onSynchronizationComplete(ric));
- }
-
- private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) {
- logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
- ric.setState(RicState.UNDEFINED);
+ return Flux.concat(synchronizedTypes, deletePoliciesInRic);
}
AsyncRestClient createNotificationClient(final String url) {
}
private void deleteAllPoliciesInRepository(Ric ric) {
- synchronized (policies) {
- List<Policy> ricPolicies = new ArrayList<>(policies.getForRic(ric.name()));
- for (Policy policy : ricPolicies) {
- this.policies.remove(policy);
- }
+ for (Policy policy : policies.getForRic(ric.name())) {
+ this.policies.remove(policy);
}
}
}
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
- synchronized (policies) {
- return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client));
- }
+ return Flux.fromIterable(policies.getForRic(ric.name())) //
+ .flatMap(policy -> putPolicy(policy, ric, a1Client));
}
}
}
private Flux<Policy> getAllPoliciesForService(Service service) {
- synchronized (policies) {
- return Flux.fromIterable(policies.getForService(service.getName()));
- }
+ return Flux.fromIterable(policies.getForService(service.getName()));
}
private Mono<Policy> deletePolicyInRic(Policy policy) {
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
policies.clear();
policyTypes.clear();
services.clear();
+ a1ClientFactory.reset();
}
@AfterEach
@Test
public void testGetPolicies() throws Exception {
- reset();
addPolicy("id1", "type1", "service1");
String url = "/policies";
return "{\n \"servingCellNrcgi\": \"1\"\n }";
}
- private static class ConcurrencyTestRunnable implements Runnable {
- private final RestTemplate restTemplate = new RestTemplate();
- private final String baseUrl;
- static AtomicInteger nextCount = new AtomicInteger(0);
- private final int count;
- private final RicSupervision supervision;
-
- ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) {
- this.baseUrl = baseUrl;
- this.count = nextCount.incrementAndGet();
- this.supervision = supervision;
- }
-
- @Override
- public void run() {
- for (int i = 0; i < 100; ++i) {
- if (i % 10 == 0) {
- this.supervision.checkAllRics();
- }
- String name = "policy:" + count + ":" + i;
- putPolicy(name);
- deletePolicy(name);
- }
- }
-
- private void putPolicy(String name) {
- String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1";
- restTemplate.put(putUrl, createJsonHttpEntity("{}"));
- }
-
- private void deletePolicy(String name) {
- String deleteUrl = baseUrl + "/policy?instance=" + name;
- restTemplate.delete(deleteUrl);
- }
- }
-
@Test
public void testConcurrency() throws Exception {
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
- addRic("ric1");
- addPolicyType("type1", "ric1");
+ a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+ addRic("ric");
+ addPolicyType("type1", "ric");
+ addPolicyType("type2", "ric");
for (int i = 0; i < 100; ++i) {
- Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i);
+ Thread t =
+ new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
+ "TestThread_" + i);
t.start();
threads.add(t);
}
return ric;
}
- private static HttpEntity<String> createJsonHttpEntity(String content) {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- return new HttpEntity<String>(content, headers);
- }
-
private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
List<T> result = new ArrayList<>();
JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.oransc.policyagent.repository.ImmutablePolicy;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.tasks.RicSupervision;
+import org.oransc.policyagent.utils.MockA1Client;
+import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * Invoke operations over the NBI and start synchronizations in a separate
+ * thread. For test of robustness using concurrent clients.
+ */
+class ConcurrencyTestRunnable implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(ConcurrencyTestRunnable.class);
+ private final RestTemplate restTemplate = new RestTemplate();
+ private final String baseUrl;
+ static AtomicInteger nextCount = new AtomicInteger(0);
+ private final int count;
+ private final RicSupervision supervision;
+ private final MockA1ClientFactory a1ClientFactory;
+ private final Rics rics;
+ private final PolicyTypes types;
+
+ ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision, MockA1ClientFactory a1ClientFactory, Rics rics,
+ PolicyTypes types) {
+ this.baseUrl = baseUrl;
+ this.count = nextCount.incrementAndGet();
+ this.supervision = supervision;
+ this.a1ClientFactory = a1ClientFactory;
+ this.rics = rics;
+ this.types = types;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 100; ++i) {
+ if (i % 10 == 0) {
+ createInconsistency();
+ this.supervision.checkAllRics();
+ }
+ String name = "policy:" + count + ":" + i;
+ putPolicy(name);
+ putPolicy(name + "-");
+ listPolicies();
+ listTypes();
+ deletePolicy(name);
+ deletePolicy(name + "-");
+ }
+ } catch (Exception e) {
+ logger.error("Concurrency exception " + e.toString());
+ }
+ }
+
+ private Policy createPolicyObject(String id) {
+ Ric ric = this.rics.get("ric");
+ PolicyType type = this.types.get("type1");
+ return ImmutablePolicy.builder() //
+ .id(id) //
+ .json("{}") //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName("") //
+ .lastModified("") //
+ .build();
+ }
+
+ private void createInconsistency() {
+ MockA1Client client = a1ClientFactory.getOrCreateA1Client("ric");
+ Policy policy = createPolicyObject("junk");
+ client.putPolicy(policy).block();
+
+ }
+
+ private void listPolicies() {
+ String uri = baseUrl + "/policies";
+ restTemplate.getForObject(uri, String.class);
+ }
+
+ private void listTypes() {
+ String uri = baseUrl + "/policy_types";
+ restTemplate.getForObject(uri, String.class);
+ }
+
+ private void putPolicy(String name) {
+ String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric&service=service1";
+ restTemplate.put(putUrl, createJsonHttpEntity("{}"));
+ }
+
+ private void deletePolicy(String name) {
+ String deleteUrl = baseUrl + "/policy?instance=" + name;
+ restTemplate.delete(deleteUrl);
+ }
+
+ private static HttpEntity<String> createJsonHttpEntity(String content) {
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ return new HttpEntity<String>(content, headers);
+ }
+
+}
private void keepServerAlive() throws InterruptedException {
logger.info("Keeping server alive!");
-
synchronized (this) {
this.wait();
}
-
}
private static String title(String jsonSchema) {
synchronizerUnderTest.run(RIC_1);
verifyCorrectLogMessage(0, logAppender,
- "Synchronization failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
- verifyCorrectLogMessage(1, logAppender,
- "Synchronization failure recovery failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
+ "Recreation of policies failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
verify(a1ClientMock, times(2)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
@Override
public Mono<List<String>> getPolicyTypeIdentities() {
- synchronized (this.policyTypes) {
- List<String> result = new Vector<>();
- for (PolicyType p : this.policyTypes.getAll()) {
- result.add(p.name());
- }
- return mono(result);
+ List<String> result = new Vector<>();
+ for (PolicyType p : this.policyTypes.getAll()) {
+ result.add(p.name());
}
+ return mono(result);
}
@Override
public Mono<List<String>> getPolicyIdentities() {
- synchronized (this.policies) {
- Vector<String> result = new Vector<>();
- for (Policy policy : policies.getAll()) {
- result.add(policy.id());
- }
-
- return mono(result);
+ Vector<String> result = new Vector<>();
+ for (Policy policy : policies.getAll()) {
+ result.add(policy.id());
}
+
+ return mono(result);
}
@Override
this.asynchDelay = delay;
}
+ public void reset() {
+ this.asynchDelay = Duration.ofSeconds(0);
+ clients.clear();
+ }
+
}