Added fixes of concurrecy problems, when consistency check is done, policies cannot be created/deleted
Added a unit test that tests concurrent execution
Improved trace
PUT policy, return HTTP status CREATE for new policy (OK for updated)
Improved the recovery handling and the consistency monitoring
Change-Id: I2504fce35dc6e3371c7d4cf389528ce75eda442d
Issue-ID: NONRTRIC-107
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
package org.oransc.policyagent.clients;
import java.util.List;
+
import org.oransc.policyagent.repository.Policy;
import reactor.core.publisher.Flux;
public class AsyncRestClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final WebClient client;
+ private final String baseUrl;
public class AsyncRestClientException extends Exception {
public AsyncRestClient(String baseUrl) {
this.client = WebClient.create(baseUrl);
+ this.baseUrl = baseUrl;
}
public Mono<String> post(String uri, String body) {
+ logger.debug("POST uri = '{}{}''", baseUrl, uri);
return client.post() //
.uri(uri) //
.contentType(MediaType.APPLICATION_JSON) //
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+ logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri);
return client.post() //
.uri(uri) //
.headers(headers -> headers.setBasicAuth(username, password)) //
}
public Mono<String> put(String uri, String body) {
- logger.debug("PUT uri = '{}''", uri);
+ logger.debug("PUT uri = '{}{}''", baseUrl, uri);
return client.put() //
.uri(uri) //
.contentType(MediaType.APPLICATION_JSON) //
}
public Mono<String> get(String uri) {
- logger.debug("GET uri = '{}''", uri);
+ logger.debug("GET uri = '{}{}''", baseUrl, uri);
return client.get() //
.uri(uri) //
.retrieve() //
}
public Mono<String> delete(String uri) {
- logger.debug("DELETE uri = '{}''", uri);
+ logger.debug("DELETE uri = '{}{}''", baseUrl, uri);
return client.delete() //
.uri(uri) //
.retrieve() //
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
import java.util.ArrayList;
import java.util.List;
+
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.invoke.MethodHandles;
import java.util.List;
+
import org.json.JSONObject;
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.repository.Policy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.util.UriComponentsBuilder;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.repository.Policy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
String inputJsonString = JsonHelper.createInputJsonString(inputParams);
logger.debug("POST putPolicy inputJsonString = {}", inputJsonString);
- return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString,
- a1ControllerUsername, a1ControllerPassword);
+ return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString, a1ControllerUsername,
+ a1ControllerPassword);
}
@Override
import java.lang.invoke.MethodHandles;
import java.util.List;
+
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.repository.Policy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
package org.oransc.policyagent.clients;
import java.util.List;
+
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.repository.Policy;
import org.springframework.web.util.UriComponentsBuilder;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+
import lombok.Getter;
+
import org.oransc.policyagent.exceptions.ServiceException;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+
import javax.validation.constraints.NotNull;
+
import lombok.Getter;
+
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.oransc.policyagent.exceptions.ServiceException;
import org.springframework.http.MediaType;
package org.oransc.policyagent.configuration;
import com.google.common.collect.ImmutableList;
+
import org.immutables.value.Value;
@Value.Immutable
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+
import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
@DeleteMapping("/policy")
@ApiOperation(value = "Delete a policy", response = Object.class)
@ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted", response = Object.class)})
- public Mono<ResponseEntity<Void>> deletePolicy( //
+ public Mono<ResponseEntity<Object>> deletePolicy( //
@RequestParam(name = "instance", required = true) String id) {
Policy policy = policies.get(id);
if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) {
- policies.remove(policy);
+ Ric ric = policy.ric();
return a1ClientFactory.createA1Client(policy.ric()) //
+ .doOnNext(notUsed -> ric.getLock().lockBlocking()) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
.flatMap(client -> client.deletePolicy(policy)) //
+ .doOnNext(notUsed -> ric.getLock().unlock()) //
+ .doOnError(notUsed -> ric.getLock().unlock()) //
.flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)));
+ } else if (policy != null) {
+ return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
} else {
return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@PutMapping(path = "/policy")
@ApiOperation(value = "Put a policy", response = String.class)
@ApiResponses(value = {@ApiResponse(code = 200, message = "Policy created or updated")})
- public Mono<ResponseEntity<String>> putPolicy( //
+ public Mono<ResponseEntity<Object>> putPolicy( //
@RequestParam(name = "type", required = true) String typeName, //
@RequestParam(name = "instance", required = true) String instanceId, //
@RequestParam(name = "ric", required = true) String ricName, //
.lastModified(getTimeStampUtc()) //
.build();
- return validateModifiedPolicy(policy) //
- .flatMap(x -> a1ClientFactory.createA1Client(ric)) //
+ final boolean isCreate = this.policies.get(policy.id()) == null;
+
+ return Mono.just(policy) //
+ .doOnNext(notUsed -> ric.getLock().lockBlocking()) //
+ .flatMap(p -> validateModifiedPolicy(policy)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
.flatMap(client -> client.putPolicy(policy)) //
.doOnNext(notUsed -> policies.put(policy)) //
- .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.OK)));
+ .doOnNext(notUsed -> ric.getLock().unlock()) //
+ .doOnError(t -> ric.getLock().unlock()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .onErrorResume(t -> Mono.just(new ResponseEntity<>(t.getMessage(), HttpStatus.METHOD_NOT_ALLOWED)));
}
- return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+
+ return ric == null && type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND))
+ : Mono.just(new ResponseEntity<>(HttpStatus.CONFLICT)); // Recovering
}
private Mono<Object> validateModifiedPolicy(Policy policy) {
Policy current = this.policies.get(policy.id());
if (current != null) {
if (!current.ric().name().equals(policy.ric().name())) {
- return Mono.error(new Exception("Policy cannot change RIC or service"));
+ return Mono.error(new Exception("Policy cannot change RIC, policyId: " + current.id() + //
+ ", RIC name: " + current.ric().name() + //
+ ", new name: " + policy.ric().name()));
}
}
return Mono.just("OK");
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
@ApiResponses(
value = {@ApiResponse(code = 200, message = "OK", response = ServiceStatus.class, responseContainer = "List")})
public ResponseEntity<String> getServices(//
- @RequestParam(name = "serviceName", required = false) String name) {
+ @RequestParam(name = "name", required = false) String name) {
Collection<ServiceStatus> servicesStatus = new ArrayList<>();
synchronized (this.services) {
@ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
@DeleteMapping("/services")
public ResponseEntity<String> deleteService(//
- @RequestParam(name = "serviceName", required = true) String serviceName) {
+ @RequestParam(name = "name", required = true) String serviceName) {
try {
Service service = removeService(serviceName);
// Remove the policies from the repo and let the consistency monitoring
@ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
@PostMapping("/services/keepalive")
public ResponseEntity<String> keepAliveService(//
- @RequestParam(name = "serviceName", required = true) String serviceName) {
+ @RequestParam(name = "name", required = true) String serviceName) {
try {
services.getService(serviceName).ping();
return new ResponseEntity<>("OK", HttpStatus.OK);
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
import java.io.IOException;
+
import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.repository;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+
+/**
+ * A resource lock. The caller thread will be blocked until the lock is granted.
+ * Exclusive means that the caller takes exclusive ownership of the resurce. Non
+ * exclusive lock means that several users can lock the resource (for shared
+ * usage).
+ */
+public class Lock {
+ private static final Logger logger = LoggerFactory.getLogger(Lock.class);
+
+ private boolean isExclusive = false;
+ private int cnt = 0;
+
+ public static enum LockType {
+ EXCLUSIVE, SHARED
+ }
+
+ public synchronized void lockBlocking(LockType locktype) {
+ while (!tryLock(locktype)) {
+ this.waitForUnlock();
+ }
+ }
+
+ public synchronized void lockBlocking() {
+ lockBlocking(LockType.SHARED);
+ }
+
+ public synchronized Mono<Lock> lock(LockType lockType) {
+ if (tryLock(lockType)) {
+ return Mono.just(this);
+ } else {
+ return Mono.create(monoSink -> addToQueue(monoSink, lockType));
+ }
+ }
+
+ public synchronized void unlock() {
+ if (disable()) {
+ return;
+ }
+ if (cnt <= 0) {
+ cnt = -1; // Might as well stop, to make it easier to find the problem
+ throw new RuntimeException("Number of unlocks must match the number of locks");
+ }
+ this.cnt--;
+ if (cnt == 0) {
+ isExclusive = false;
+ }
+ this.processQueuedEntries();
+ this.notifyAll();
+ }
+
+ private void processQueuedEntries() {
+ for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
+ QueueEntry e = i.next();
+ if (tryLock(e.lockType)) {
+ i.remove();
+ e.callback.success(this);
+ }
+ }
+ }
+
+ static class QueueEntry {
+ final MonoSink<Lock> callback;
+ final LockType lockType;
+
+ QueueEntry(MonoSink<Lock> callback, LockType lockType) {
+ this.callback = callback;
+ this.lockType = lockType;
+ }
+ }
+
+ private final List<QueueEntry> queue = new LinkedList<>();
+
+ private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
+ queue.add(new QueueEntry(callback, lockType));
+ }
+
+ private void waitForUnlock() {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ logger.warn("waitForUnlock interrupted", e);
+ }
+ }
+
+ private boolean disable() {
+ return true;
+ }
+
+ private boolean tryLock(LockType lockType) {
+ if (disable()) {
+ return true;
+ }
+ if (this.isExclusive) {
+ return false;
+ }
+ if (lockType == LockType.EXCLUSIVE && cnt > 0) {
+ return false;
+ }
+ cnt++;
+ this.isExclusive = lockType == LockType.EXCLUSIVE;
+ return true;
+ }
+
+ public synchronized int getLockCounter() {
+ return this.cnt;
+ }
+
+}
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+
import org.oransc.policyagent.exceptions.ServiceException;
public class Policies {
import java.util.List;
import java.util.Map;
import java.util.Vector;
+
import lombok.Getter;
import lombok.Setter;
+
import org.oransc.policyagent.clients.A1Client.A1ProtocolType;
import org.oransc.policyagent.configuration.RicConfig;
* Represents the dynamic information about a NearRealtime-RIC.
*/
public class Ric {
+
private final RicConfig ricConfig;
private final List<String> managedElementIds;
@Setter
private A1ProtocolType protocolVersion = A1ProtocolType.UNKNOWN;
+ @Getter
+ private final Lock lock = new Lock();
/**
* Creates the Ric. Initial state is {@link RicState.NOT_INITIATED}.
*/
public Ric(RicConfig ricConfig) {
this.ricConfig = ricConfig;
- this.managedElementIds = new ArrayList<>(ricConfig.managedElementIds());
+ this.managedElementIds = new ArrayList<>(ricConfig.managedElementIds()); // TODO, this is config why is it
+ // copied here?
}
public String name() {
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import reactor.core.publisher.Mono;
/**
- * Regularly checks the existing rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent.
*/
@Component
@EnableScheduling
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.flatMap(this::checkRicState) //
+ .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
.flatMap(this::checkRicPolicies) //
- .flatMap(this::checkRicPolicyTypes);
+ .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
+ .flatMap(this::checkRicPolicyTypes); //
}
}
private Mono<RicData> checkRicPolicies(RicData ric) {
return ric.a1Client.getPolicyIdentities() //
- .onErrorResume(t -> Mono.empty()) //
+ .onErrorResume(t -> {
+ ric.ric.getLock().unlock();
+ return Mono.empty();
+ }) //
.flatMap(ricP -> validateInstances(ricP, ric));
}
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
+ ric.ric.getLock().unlock();
return startSynchronization(ric);
}
- }
- for (String policyId : ricPolicies) {
- if (!policies.containsPolicy(policyId)) {
- return startSynchronization(ric);
+
+ for (String policyId : ricPolicies) {
+ if (!policies.containsPolicy(policyId)) {
+ ric.ric.getLock().unlock();
+ return startSynchronization(ric);
+ }
}
+ return Mono.just(ric);
}
- return Mono.just(ric);
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
RicSynchronizationTask createSynchronizationTask() {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
}
-}
\ No newline at end of file
+}
import static org.oransc.policyagent.repository.Ric.RicState;
-import java.util.Collection;
import java.util.Vector;
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
import reactor.core.publisher.Mono;
/**
- * Synchronizes the content of a RIC with the content in the repository.
- * This means:
- * - load all policy types
- * - send all policy instances to the RIC
- * --- if that fails remove all policy instances
- * - Notify subscribing services
+ * Synchronizes the content of a RIC with the content in the repository. This
+ * means:
+ * <p>
+ * load all policy types
+ * <p>
+ * send all policy instances to the RIC
+ * <p>
+ * if that fails remove all policy instances
+ * <p>
+ * Notify subscribing services
*/
public class RicSynchronizationTask {
}
ric.setState(RicState.SYNCHRONIZING);
}
+ ric.getLock().lockBlocking(LockType.EXCLUSIVE); // Make sure no NBI updates are running
+ ric.getLock().unlock();
this.a1ClientFactory.createA1Client(ric)//
.flatMapMany(client -> startSynchronization(ric, client)) //
.subscribe(x -> logger.debug("Synchronize: {}", x), //
private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> recoverTypes = synchronizePolicyTypes(ric, a1Client);
- Collection<Policy> policiesForRic = policies.getForRic(ric.name());
- Flux<?> policiesDeletedInRic = Flux.empty();
- Flux<?> policiesRecreatedInRic = Flux.empty();
- if (!policiesForRic.isEmpty()) {
- policiesDeletedInRic = a1Client.deleteAllPolicies();
- policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
- }
+ Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
+ Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+
return Flux.concat(recoverTypes, policiesDeletedInRic, policiesRecreatedInRic);
}
@SuppressWarnings("squid:S2629")
private void onSynchronizationError(Ric ric, Throwable t) {
logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
+ // If recovery fails, try to remove all instances
deleteAllPoliciesInRepository(ric);
- Flux<PolicyType> typesRecoveredForRic = this.a1ClientFactory.createA1Client(ric) //
+ Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
.flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
+ Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(a1Client -> a1Client.deleteAllPolicies()) //
+ .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
- // If recovery fails, try to remove all instances
- Flux<?> policiesDeletedInRic = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(A1Client::deleteAllPolicies);
-
- Flux.merge(typesRecoveredForRic, policiesDeletedInRic) //
- .subscribe(x -> logger.debug("Brute recover: {}", x), //
+ Flux.concat(recoverTypes, deletePoliciesInRic) //
+ .subscribe(x -> logger.debug("Brute recover: " + x), //
throwable -> onRecoveryError(ric, throwable), //
() -> onSynchronizationComplete(ric));
}
}
private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
- ric.clearSupportedPolicyTypes();
return a1Client.getPolicyTypeIdentities() //
+ .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
.flatMapMany(Flux::fromIterable) //
.doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
.flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.ImmutablePolicy;
import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatus.Series;
import org.springframework.http.MediaType;
Services services;
private static Gson gson = new GsonBuilder() //
- .serializeNulls() //
- .create(); //
+ .serializeNulls() //
+ .create(); //
public static class MockApplicationConfig extends ApplicationConfig {
@Override
return new MockA1ClientFactory(this.policyTypes);
}
- @Bean
- public Policies getPolicies() {
- return new Policies();
- }
-
@Bean
public PolicyTypes getPolicyTypes() {
return this.policyTypes;
}
-
- @Bean
- public Rics getRics() {
- return new Rics();
- }
}
@LocalServerPort
@Override
public boolean hasError(ClientHttpResponse httpResponse) throws IOException {
return (httpResponse.getStatusCode().series() == Series.CLIENT_ERROR
- || httpResponse.getStatusCode().series() == Series.SERVER_ERROR);
+ || httpResponse.getStatusCode().series() == Series.SERVER_ERROR);
}
@Override
}
}
+ private void setRestErrorhandler() {
+ restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler());
+ }
+
+ @BeforeEach
+ public void reset() {
+ rics.clear();
+ policies.clear();
+ policyTypes.clear();
+ services.clear();
+ }
+
+ @AfterEach
+ public void verifyNoRicLocks() {
+ for (Ric ric : this.rics.getRics()) {
+ ric.getLock().lockBlocking(LockType.EXCLUSIVE);
+ ric.getLock().unlock();
+ assertThat(ric.getLock().getLockCounter()).isEqualTo(0);
+ assertThat(ric.getState()).isEqualTo(Ric.RicState.IDLE);
+ }
+ }
+
@Test
public void testGetRics() throws Exception {
- reset();
addRic("kista_1");
String url = baseUrl() + "/rics";
String rsp = this.restTemplate.getForObject(url, String.class);
@Test
public void testRecovery() throws Exception {
- reset();
+ addRic("ric").setState(Ric.RicState.UNDEFINED);
String ricName = "ric";
Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName);
@Test
public void testGetRicForManagedElement_thenReturnCorrectRic() throws Exception {
- reset();
addRic("notCorrectRic1");
addRic("notCorrectRic2");
addRic("notCorrectRic3");
}
@Test
- public void testGetRicForManagedElementThatDoesNotExist_thenReturnEmpty() throws Exception {
- reset();
- addRic("notCorrectRic1");
- addRic("notCorrectRic2");
- addRic("notCorrectRic3");
- addRic("notCorrectRic4");
- addRic("notCorrectRic5");
- addRic("notCorrectRic6");
-
+ public void testGetRicForManagedElementThatDoesNotExist() throws Exception {
+ this.setRestErrorhandler();
String url = baseUrl() + "/ric?managedElementId=kista_1";
- String rsp = this.restTemplate.getForObject(url, String.class);
-
- assertThat(rsp).isNull();
+ ResponseEntity<String> entity = this.restTemplate.getForEntity(url, String.class);
+ assertThat(entity.getStatusCode().equals(HttpStatus.NOT_FOUND));
}
@Test
public void testPutPolicy() throws Exception {
- reset();
String serviceName = "service1";
String ricName = "ric1";
String policyTypeName = "type1";
addPolicyType(policyTypeName, ricName);
String url = baseUrl() + "/policy?type=" + policyTypeName + "&instance=" + policyInstanceId + "&ric=" + ricName
- + "&service=" + serviceName;
+ + "&service=" + serviceName;
final String json = jsonString();
this.rics.getRic(ricName).setState(Ric.RicState.IDLE);
public void testRefuseToUpdatePolicy() throws Exception {
// Test that only the json can be changed for a already created policy
// In this case service is attempted to be changed
- reset();
- this.addRic("ric1").setState(Ric.RicState.IDLE);
- this.addRic("ricXXX").setState(Ric.RicState.IDLE);
+ this.addRic("ric1");
+ this.addRic("ricXXX");
this.addPolicy("instance1", "type1", "service1", "ric1");
+ this.setRestErrorhandler();
String urlWrongRic = baseUrl() + "/policy?type=type1&instance=instance1&ric=ricXXX&service=service1";
- this.restTemplate.put(urlWrongRic, createJsonHttpEntity(jsonString()));
+ ResponseEntity<String> entity = this.putForEntity(urlWrongRic, jsonString());
+ assertThat(entity.getStatusCode().equals(HttpStatus.METHOD_NOT_ALLOWED));
+
Policy policy = policies.getPolicy("instance1");
assertThat(policy.ric().name()).isEqualTo("ric1"); // Not changed
}
@Test
public void testDeletePolicy() throws Exception {
- reset();
String url = baseUrl() + "/policy?instance=id";
- Policy policy = addPolicy("id", "typeName", "service1", "ric1");
- policy.ric().setState(Ric.RicState.IDLE);
+ addPolicy("id", "typeName", "service1", "ric1");
assertThat(policies.size()).isEqualTo(1);
this.restTemplate.delete(url);
@Test
public void testGetPolicySchemas() throws Exception {
- reset();
addPolicyType("type1", "ric1");
addPolicyType("type2", "ric2");
@Test
public void testGetPolicySchema() throws Exception {
- reset();
addPolicyType("type1", "ric1");
addPolicyType("type2", "ric2");
@Test
public void testGetPolicyTypes() throws Exception {
- reset();
addPolicyType("type1", "ric1");
addPolicyType("type2", "ric2");
@Test
public void testPutAndGetService() throws Exception {
- reset();
// PUT
putService("name");
System.out.println(rsp);
// Keep alive
- url = baseUrl() + "/services/keepalive?serviceName=name";
- rsp = this.restTemplate.postForObject(url, null, String.class);
- assertThat(rsp.contains("OK")).isTrue();
+ url = baseUrl() + "/services/keepalive?name=name";
+ ResponseEntity<String> entity = this.restTemplate.postForEntity(url, null, String.class);
+ assertThat(entity.getStatusCode().equals(HttpStatus.OK));
// DELETE
assertThat(services.size()).isEqualTo(1);
- url = baseUrl() + "/services?serviceName=name";
+ url = baseUrl() + "/services?name=name";
this.restTemplate.delete(url);
assertThat(services.size()).isEqualTo(0);
// Keep alive, no registerred service
- url = baseUrl() + "/services/keepalive?serviceName=nameXXX";
- ResponseEntity<String> entity = this.restTemplate.postForEntity(url, null, String.class);
+ url = baseUrl() + "/services/keepalive?name=name";
+ setRestErrorhandler();
+ entity = this.restTemplate.postForEntity(url, null, String.class);
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
public void testGetPolicyStatus() throws Exception {
- reset();
- Policy policy = addPolicy("id", "typeName", "service1", "ric1");
- policy.ric().setState(Ric.RicState.IDLE);
+ addPolicy("id", "typeName", "service1", "ric1");
assertThat(policies.size()).isEqualTo(1);
String url = baseUrl() + "/policy_status?instance=id";
assertThat(rsp.equals("OK")).isTrue();
}
- private PolicyType addPolicyType(String policyTypeName, String ricName) {
- PolicyType type = ImmutablePolicyType.builder() //
- .name(policyTypeName) //
- .schema("{\"title\":\"" + policyTypeName + "\"}") //
- .build();
-
- policyTypes.put(type);
- addRic(ricName).addSupportedPolicyType(type);
- return type;
- }
-
- private Ric addRic(String ricName) {
- if (rics.get(ricName) != null) {
- return rics.get(ricName);
- }
- Vector<String> mes = new Vector<>();
- RicConfig conf = ImmutableRicConfig.builder() //
- .name(ricName) //
- .baseUrl(ricName) //
- .managedElementIds(mes) //
- .build();
- Ric ric = new Ric(conf);
- this.rics.put(ric);
- return ric;
- }
-
private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
addRic(ric);
Policy p = ImmutablePolicy.builder().id(id) //
- .json(jsonString()) //
- .ownerServiceName(service) //
- .ric(rics.getRic(ric)) //
- .type(addPolicyType(typeName, ric)) //
- .lastModified("lastModified").build();
+ .json(jsonString()) //
+ .ownerServiceName(service) //
+ .ric(rics.getRic(ric)) //
+ .type(addPolicyType(typeName, ric)) //
+ .lastModified("lastModified").build();
policies.put(p);
return p;
}
return "http://localhost:" + port;
}
- private void reset() {
- rics.clear();
- policies.clear();
- policyTypes.clear();
- services.clear();
- assertThat(policies.size()).isEqualTo(0);
- restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler());
- }
-
private String jsonString() {
return "{\n \"servingCellNrcgi\": \"1\"\n }";
}
+ private static class ConcurrencyTestRunnable implements Runnable {
+ private final RestTemplate restTemplate = new RestTemplate();
+ private final String baseUrl;
+ static AtomicInteger nextCount = new AtomicInteger(0);
+ private final int count;
+ private final RepositorySupervision supervision;
+
+ ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) {
+ this.baseUrl = baseUrl;
+ this.count = nextCount.incrementAndGet();
+ this.supervision = supervision;
+ }
+
+ public void run() {
+ for (int i = 0; i < 100; ++i) {
+ if (i % 10 == 0) {
+ this.supervision.checkAllRics();
+ }
+ String name = "policy:" + count + ":" + i;
+ putPolicy(name);
+ deletePolicy(name);
+ }
+ }
+
+ private void putPolicy(String name) {
+ String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1";
+ this.restTemplate.put(putUrl, createJsonHttpEntity("{}"));
+ }
+
+ private void deletePolicy(String name) {
+ String deleteUrl = baseUrl + "/policy?instance=" + name;
+ this.restTemplate.delete(deleteUrl);
+ }
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ final Instant startTime = Instant.now();
+ List<Thread> threads = new ArrayList<>();
+ addRic("ric1");
+ addPolicyType("type1", "ric1");
+
+ for (int i = 0; i < 100; ++i) {
+ Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i);
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ assertThat(policies.size()).isEqualTo(0);
+ System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now()));
+ }
+
private MockA1Client getA1Client(String ricName) throws ServiceException {
return a1ClientFactory.getOrCreateA1Client(ricName);
}
- private HttpEntity<String> createJsonHttpEntity(String content) {
+ private PolicyType addPolicyType(String policyTypeName, String ricName) {
+ PolicyType type = ImmutablePolicyType.builder() //
+ .name(policyTypeName) //
+ .schema("{\"title\":\"" + policyTypeName + "\"}") //
+ .build();
+
+ policyTypes.put(type);
+ addRic(ricName).addSupportedPolicyType(type);
+ return type;
+ }
+
+ private Ric addRic(String ricName) {
+ if (rics.get(ricName) != null) {
+ return rics.get(ricName);
+ }
+ Vector<String> mes = new Vector<>();
+ RicConfig conf = ImmutableRicConfig.builder() //
+ .name(ricName) //
+ .baseUrl(ricName) //
+ .managedElementIds(mes) //
+ .build();
+ Ric ric = new Ric(conf);
+ ric.setState(Ric.RicState.IDLE);
+ this.rics.put(ric);
+ return ric;
+ }
+
+ private static HttpEntity<String> createJsonHttpEntity(String content) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
return new HttpEntity<String>(content, headers);
}
+ private ResponseEntity<String> putForEntity(String url, String jsonBody) {
+ return restTemplate.exchange(url, HttpMethod.PUT, createJsonHttpEntity(jsonBody), String.class);
+ }
+
private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
List<T> result = new ArrayList<>();
JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
}
return result;
}
-
}
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.oransc.policyagent.repository.Policy;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
private JsonObject fake_info_object;
public String toString() {
- return String.format("[dmaap_publisher=%s, fake_info_object=%s]", dmaap_publisher.toString(), fake_info_object.toString());
+ return String.format("[dmaap_publisher=%s, fake_info_object=%s]", dmaap_publisher.toString(),
+ fake_info_object.toString());
}
}
private JsonObject fake_info_object;
public String toString() {
- return String.format("[dmaap_subscriber=%s, fake_info_object=%s]", dmaap_subscriber.toString(), fake_info_object.toString());
+ return String.format("[dmaap_subscriber=%s, fake_info_object=%s]", dmaap_subscriber.toString(),
+ fake_info_object.toString());
}
}
"Wrong error message when the streams publishes' URL has incorrect syntax");
}
- public JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes, String dmaapPublisherOrSubscriber)
- throws Exception {
+ public JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes,
+ String dmaapPublisherOrSubscriber) throws Exception {
return jsonRootObject.getAsJsonObject("config").getAsJsonObject(streamsPublishesOrSubscribes)
.getAsJsonObject(dmaapPublisherOrSubscriber).getAsJsonObject("dmaap_info");
}
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Properties;
import java.util.Vector;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.oransc.policyagent.configuration.ImmutableRicConfig;
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.utils.LoggingUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ExtendWith(MockitoExtension.class)
public class RefreshConfigTaskTest {
-
private RefreshConfigTask refreshTaskUnderTest;
@Spy
import java.util.Collections;
import java.util.List;
import java.util.Vector;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.error((Exception) returnValue));
}
}
-}
\ No newline at end of file
+}
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.policyagent.repository.Service;
import org.oransc.policyagent.repository.Services;
import org.oransc.policyagent.utils.LoggingUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
synchronizerUnderTest.run(RIC_1);
- verify(a1ClientMock).getPolicyTypeIdentities();
+ verify(a1ClientMock, times(1)).getPolicyTypeIdentities();
verifyNoMoreInteractions(a1ClientMock);
verify(synchronizerUnderTest).run(RIC_1);
private void setUpCreationOfA1Client() {
when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
+ doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
}
private AsyncRestClient setUpCreationOfAsyncRestClient(RicSynchronizationTask synchronizerUnderTest) {
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import java.time.Duration;
import java.util.Collections;
+
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.oransc.policyagent.repository.Ric.RicState.IDLE;
import com.google.common.collect.ImmutableList;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Mono<List<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
doReturn(policyTypes1, policyTypes2).when(a1ClientMock).getPolicyTypeIdentities();
doReturn(Mono.just("Schema")).when(a1ClientMock).getPolicyTypeSchema(anyString());
+ doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
ApplicationConfig.RicConfigUpdate.ADDED);
- await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2));
+ Ric firstRic = rics.get(FIRST_RIC_NAME);
+ Ric secondRic = rics.get(SECOND_RIC_NAME);
+ await().untilAsserted(() -> assertThat(firstRic.getState()).isEqualTo(IDLE));
+ await().untilAsserted(() -> assertThat(secondRic.getState()).isEqualTo(IDLE));
assertTrue(policyTypes.contains(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not added to PolicyTypes.");
assertTrue(policyTypes.contains(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not added to PolicyTypes.");
assertEquals(2, rics.size(), "Correct number of Rics not added to Rics");
- Ric firstRic = rics.get(FIRST_RIC_NAME);
assertNotNull(firstRic, "Ric " + FIRST_RIC_NAME + " not added to repository");
assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
- assertEquals(IDLE, firstRic.getState(), "Not correct state for ric " + FIRST_RIC_NAME);
+
assertEquals(1, firstRic.getSupportedPolicyTypes().size(),
"Not correct no of types supported for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
"Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME);
- Ric secondRic = rics.get(SECOND_RIC_NAME);
assertNotNull(secondRic, "Ric " + SECOND_RIC_NAME + " not added to repository");
assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics");
- assertEquals(IDLE, secondRic.getState(), "Not correct state for " + SECOND_RIC_NAME);
assertEquals(2, secondRic.getSupportedPolicyTypes().size(),
"Not correct no of types supported for ric " + SECOND_RIC_NAME);
assertTrue(secondRic.isSupportingType(POLICY_TYPE_1_NAME),