From: Henrik Andersson Date: Wed, 4 Mar 2020 07:32:30 +0000 (+0000) Subject: Merge "Fixed concurrency problems" X-Git-Tag: 2.0.0~148 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=7c297ddb425a52dae965adc6a83629a14421ea05;hp=d5aaf1cc8be867a6b298f1e79069f7445ddbfb57;p=nonrtric.git Merge "Fixed concurrency problems" --- diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java index ff9c8ffb..ace486b9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java @@ -21,6 +21,7 @@ package org.oransc.policyagent.clients; import java.util.List; + import org.oransc.policyagent.repository.Policy; import reactor.core.publisher.Flux; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java index 0a7c3a18..0d27bd6c 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Mono; public class AsyncRestClient { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final WebClient client; + private final String baseUrl; public class AsyncRestClientException extends Exception { @@ -44,9 +45,11 @@ public class AsyncRestClient { public AsyncRestClient(String baseUrl) { this.client = WebClient.create(baseUrl); + this.baseUrl = baseUrl; } public Mono post(String uri, String body) { + logger.debug("POST uri = '{}{}''", baseUrl, uri); return client.post() // .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // @@ -58,6 +61,7 @@ public class AsyncRestClient { } public Mono postWithAuthHeader(String uri, String body, String username, String password) { + logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri); return client.post() // .uri(uri) // .headers(headers -> headers.setBasicAuth(username, password)) // @@ -70,7 +74,7 @@ public class AsyncRestClient { } public Mono put(String uri, String body) { - logger.debug("PUT uri = '{}''", uri); + logger.debug("PUT uri = '{}{}''", baseUrl, uri); return client.put() // .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // @@ -82,7 +86,7 @@ public class AsyncRestClient { } public Mono get(String uri) { - logger.debug("GET uri = '{}''", uri); + logger.debug("GET uri = '{}{}''", baseUrl, uri); return client.get() // .uri(uri) // .retrieve() // @@ -92,7 +96,7 @@ public class AsyncRestClient { } public Mono delete(String uri) { - logger.debug("DELETE uri = '{}''", uri); + logger.debug("DELETE uri = '{}{}''", baseUrl, uri); return client.delete() // .uri(uri) // .retrieve() // diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java index 2e7963b5..68ffb10b 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java @@ -23,11 +23,14 @@ package org.oransc.policyagent.clients; import com.google.gson.FieldNamingPolicy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; + import java.util.ArrayList; import java.util.List; + import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapA1Client.java index 9b3bb71d..b9d7f352 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapA1Client.java @@ -23,10 +23,12 @@ package org.oransc.policyagent.clients; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; + import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.repository.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -96,8 +98,8 @@ public class SdncOnapA1Client implements A1Client { String inputJsonString = JsonHelper.createInputJsonString(inputParams); logger.debug("POST putPolicy inputJsonString = {}", inputJsonString); - return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString, - a1ControllerUsername, a1ControllerPassword); + return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString, a1ControllerUsername, + a1ControllerPassword); } @Override diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java index 9bd0f6c8..ac52d6ae 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java @@ -22,10 +22,12 @@ package org.oransc.policyagent.clients; import java.lang.invoke.MethodHandles; import java.util.List; + import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.repository.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java index 1715d9db..73adecee 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java @@ -21,9 +21,11 @@ package org.oransc.policyagent.clients; import java.util.List; + import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.repository.Policy; import org.springframework.web.util.UriComponentsBuilder; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index 1dd850be..deacb446 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -25,9 +25,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Properties; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; + import lombok.Getter; + import org.oransc.policyagent.exceptions.ServiceException; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index 578ff124..c69c39d4 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -23,6 +23,7 @@ package org.oransc.policyagent.configuration; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; + import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; @@ -31,8 +32,11 @@ import java.util.List; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; + import javax.validation.constraints.NotNull; + import lombok.Getter; + import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; import org.springframework.http.MediaType; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/RicConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/RicConfig.java index 69ba198e..ab54b527 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/RicConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/RicConfig.java @@ -21,6 +21,7 @@ package org.oransc.policyagent.configuration; import com.google.common.collect.ImmutableList; + import org.immutables.value.Value; @Value.Immutable diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index ea27524e..d6d65fc7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -22,6 +22,7 @@ package org.oransc.policyagent.controllers; import com.google.gson.Gson; import com.google.gson.GsonBuilder; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -30,6 +31,7 @@ import io.swagger.annotations.ApiResponses; import java.util.ArrayList; import java.util.Collection; import java.util.List; + import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; @@ -150,14 +152,20 @@ public class PolicyController { @DeleteMapping("/policy") @ApiOperation(value = "Delete a policy", response = Object.class) @ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted", response = Object.class)}) - public Mono> deletePolicy( // + public Mono> deletePolicy( // @RequestParam(name = "instance", required = true) String id) { Policy policy = policies.get(id); if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) { - policies.remove(policy); + Ric ric = policy.ric(); return a1ClientFactory.createA1Client(policy.ric()) // + .doOnNext(notUsed -> ric.getLock().lockBlocking()) // + .doOnNext(notUsed -> policies.remove(policy)) // .flatMap(client -> client.deletePolicy(policy)) // + .doOnNext(notUsed -> ric.getLock().unlock()) // + .doOnError(notUsed -> ric.getLock().unlock()) // .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT))); + } else if (policy != null) { + return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED)); } else { return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } @@ -166,7 +174,7 @@ public class PolicyController { @PutMapping(path = "/policy") @ApiOperation(value = "Put a policy", response = String.class) @ApiResponses(value = {@ApiResponse(code = 200, message = "Policy created or updated")}) - public Mono> putPolicy( // + public Mono> putPolicy( // @RequestParam(name = "type", required = true) String typeName, // @RequestParam(name = "instance", required = true) String instanceId, // @RequestParam(name = "ric", required = true) String ricName, // @@ -187,13 +195,22 @@ public class PolicyController { .lastModified(getTimeStampUtc()) // .build(); - return validateModifiedPolicy(policy) // - .flatMap(x -> a1ClientFactory.createA1Client(ric)) // + final boolean isCreate = this.policies.get(policy.id()) == null; + + return Mono.just(policy) // + .doOnNext(notUsed -> ric.getLock().lockBlocking()) // + .flatMap(p -> validateModifiedPolicy(policy)) // + .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) // .flatMap(client -> client.putPolicy(policy)) // .doOnNext(notUsed -> policies.put(policy)) // - .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.OK))); + .doOnNext(notUsed -> ric.getLock().unlock()) // + .doOnError(t -> ric.getLock().unlock()) // + .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) // + .onErrorResume(t -> Mono.just(new ResponseEntity<>(t.getMessage(), HttpStatus.METHOD_NOT_ALLOWED))); } - return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); + + return ric == null && type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)) + : Mono.just(new ResponseEntity<>(HttpStatus.CONFLICT)); // Recovering } private Mono validateModifiedPolicy(Policy policy) { @@ -201,7 +218,9 @@ public class PolicyController { Policy current = this.policies.get(policy.id()); if (current != null) { if (!current.ric().name().equals(policy.ric().name())) { - return Mono.error(new Exception("Policy cannot change RIC or service")); + return Mono.error(new Exception("Policy cannot change RIC, policyId: " + current.id() + // + ", RIC name: " + current.ric().name() + // + ", new name: " + policy.ric().name())); } } return Mono.just("OK"); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java index b95c1967..f6fc8500 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java @@ -27,9 +27,11 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; + import java.util.ArrayList; import java.util.List; import java.util.Optional; + import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java index 3f775a58..464511f4 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; + import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; @@ -68,7 +69,7 @@ public class ServiceController { @ApiResponses( value = {@ApiResponse(code = 200, message = "OK", response = ServiceStatus.class, responseContainer = "List")}) public ResponseEntity getServices(// - @RequestParam(name = "serviceName", required = false) String name) { + @RequestParam(name = "name", required = false) String name) { Collection servicesStatus = new ArrayList<>(); synchronized (this.services) { @@ -104,7 +105,7 @@ public class ServiceController { @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")}) @DeleteMapping("/services") public ResponseEntity deleteService(// - @RequestParam(name = "serviceName", required = true) String serviceName) { + @RequestParam(name = "name", required = true) String serviceName) { try { Service service = removeService(serviceName); // Remove the policies from the repo and let the consistency monitoring @@ -122,7 +123,7 @@ public class ServiceController { @ApiResponse(code = 404, message = "The service is not found, needs re-registration")}) @PostMapping("/services/keepalive") public ResponseEntity keepAliveService(// - @RequestParam(name = "serviceName", required = true) String serviceName) { + @RequestParam(name = "name", required = true) String serviceName) { try { services.getService(serviceName).ping(); return new ResponseEntity<>("OK", HttpStatus.OK); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java new file mode 100644 index 00000000..68ea5a7b --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java @@ -0,0 +1,140 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.repository; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; + +/** + * A resource lock. The caller thread will be blocked until the lock is granted. + * Exclusive means that the caller takes exclusive ownership of the resurce. Non + * exclusive lock means that several users can lock the resource (for shared + * usage). + */ +public class Lock { + private static final Logger logger = LoggerFactory.getLogger(Lock.class); + + private boolean isExclusive = false; + private int cnt = 0; + + public static enum LockType { + EXCLUSIVE, SHARED + } + + public synchronized void lockBlocking(LockType locktype) { + while (!tryLock(locktype)) { + this.waitForUnlock(); + } + } + + public synchronized void lockBlocking() { + lockBlocking(LockType.SHARED); + } + + public synchronized Mono lock(LockType lockType) { + if (tryLock(lockType)) { + return Mono.just(this); + } else { + return Mono.create(monoSink -> addToQueue(monoSink, lockType)); + } + } + + public synchronized void unlock() { + if (disable()) { + return; + } + if (cnt <= 0) { + cnt = -1; // Might as well stop, to make it easier to find the problem + throw new RuntimeException("Number of unlocks must match the number of locks"); + } + this.cnt--; + if (cnt == 0) { + isExclusive = false; + } + this.processQueuedEntries(); + this.notifyAll(); + } + + private void processQueuedEntries() { + for (Iterator i = queue.iterator(); i.hasNext();) { + QueueEntry e = i.next(); + if (tryLock(e.lockType)) { + i.remove(); + e.callback.success(this); + } + } + } + + static class QueueEntry { + final MonoSink callback; + final LockType lockType; + + QueueEntry(MonoSink callback, LockType lockType) { + this.callback = callback; + this.lockType = lockType; + } + } + + private final List queue = new LinkedList<>(); + + private synchronized void addToQueue(MonoSink callback, LockType lockType) { + queue.add(new QueueEntry(callback, lockType)); + } + + private void waitForUnlock() { + try { + this.wait(); + } catch (InterruptedException e) { + logger.warn("waitForUnlock interrupted", e); + } + } + + private boolean disable() { + return true; + } + + private boolean tryLock(LockType lockType) { + if (disable()) { + return true; + } + if (this.isExclusive) { + return false; + } + if (lockType == LockType.EXCLUSIVE && cnt > 0) { + return false; + } + cnt++; + this.isExclusive = lockType == LockType.EXCLUSIVE; + return true; + } + + public synchronized int getLockCounter() { + return this.cnt; + } + +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java index c910dd59..54f876ac 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; + import org.oransc.policyagent.exceptions.ServiceException; public class Policies { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java index 6eece5ec..4291d6ef 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java @@ -26,8 +26,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Vector; + import lombok.Getter; import lombok.Setter; + import org.oransc.policyagent.clients.A1Client.A1ProtocolType; import org.oransc.policyagent.configuration.RicConfig; @@ -35,6 +37,7 @@ import org.oransc.policyagent.configuration.RicConfig; * Represents the dynamic information about a NearRealtime-RIC. */ public class Ric { + private final RicConfig ricConfig; private final List managedElementIds; @@ -44,6 +47,8 @@ public class Ric { @Setter private A1ProtocolType protocolVersion = A1ProtocolType.UNKNOWN; + @Getter + private final Lock lock = new Lock(); /** * Creates the Ric. Initial state is {@link RicState.NOT_INITIATED}. @@ -52,7 +57,8 @@ public class Ric { */ public Ric(RicConfig ricConfig) { this.ricConfig = ricConfig; - this.managedElementIds = new ArrayList<>(ricConfig.managedElementIds()); + this.managedElementIds = new ArrayList<>(ricConfig.managedElementIds()); // TODO, this is config why is it + // copied here? } public String name() { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index ce318dd7..f75db217 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -24,6 +24,7 @@ import java.util.Collection; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.clients.A1ClientFactory; +import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; @@ -41,7 +42,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Regularly checks the existing rics towards the local repository to keep it consistent. + * Regularly checks the existing rics towards the local repository to keep it + * consistent. */ @Component @EnableScheduling @@ -78,8 +80,10 @@ public class RepositorySupervision { return Flux.fromIterable(rics.getRics()) // .flatMap(this::createRicData) // .flatMap(this::checkRicState) // + .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) // .flatMap(this::checkRicPolicies) // - .flatMap(this::checkRicPolicyTypes); + .doOnNext(ricData -> ricData.ric.getLock().unlock()) // + .flatMap(this::checkRicPolicyTypes); // } } @@ -111,22 +115,28 @@ public class RepositorySupervision { private Mono checkRicPolicies(RicData ric) { return ric.a1Client.getPolicyIdentities() // - .onErrorResume(t -> Mono.empty()) // + .onErrorResume(t -> { + ric.ric.getLock().unlock(); + return Mono.empty(); + }) // .flatMap(ricP -> validateInstances(ricP, ric)); } private Mono validateInstances(Collection ricPolicies, RicData ric) { synchronized (this.policies) { if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) { + ric.ric.getLock().unlock(); return startSynchronization(ric); } - } - for (String policyId : ricPolicies) { - if (!policies.containsPolicy(policyId)) { - return startSynchronization(ric); + + for (String policyId : ricPolicies) { + if (!policies.containsPolicy(policyId)) { + ric.ric.getLock().unlock(); + return startSynchronization(ric); + } } + return Mono.just(ric); } - return Mono.just(ric); } private Mono checkRicPolicyTypes(RicData ric) { @@ -165,4 +175,4 @@ public class RepositorySupervision { RicSynchronizationTask createSynchronizationTask() { return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); } -} \ No newline at end of file +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index bcfda484..d7599911 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -22,13 +22,13 @@ package org.oransc.policyagent.tasks; import static org.oransc.policyagent.repository.Ric.RicState; -import java.util.Collection; import java.util.Vector; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.repository.ImmutablePolicyType; +import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; @@ -43,12 +43,16 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Synchronizes the content of a RIC with the content in the repository. - * This means: - * - load all policy types - * - send all policy instances to the RIC - * --- if that fails remove all policy instances - * - Notify subscribing services + * Synchronizes the content of a RIC with the content in the repository. This + * means: + *

+ * load all policy types + *

+ * send all policy instances to the RIC + *

+ * if that fails remove all policy instances + *

+ * Notify subscribing services */ public class RicSynchronizationTask { @@ -78,6 +82,8 @@ public class RicSynchronizationTask { } ric.setState(RicState.SYNCHRONIZING); } + ric.getLock().lockBlocking(LockType.EXCLUSIVE); // Make sure no NBI updates are running + ric.getLock().unlock(); this.a1ClientFactory.createA1Client(ric)// .flatMapMany(client -> startSynchronization(ric, client)) // .subscribe(x -> logger.debug("Synchronize: {}", x), // @@ -87,13 +93,9 @@ public class RicSynchronizationTask { private Flux startSynchronization(Ric ric, A1Client a1Client) { Flux recoverTypes = synchronizePolicyTypes(ric, a1Client); - Collection policiesForRic = policies.getForRic(ric.name()); - Flux policiesDeletedInRic = Flux.empty(); - Flux policiesRecreatedInRic = Flux.empty(); - if (!policiesForRic.isEmpty()) { - policiesDeletedInRic = a1Client.deleteAllPolicies(); - policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client); - } + Flux policiesDeletedInRic = a1Client.deleteAllPolicies(); + Flux policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client); + return Flux.concat(recoverTypes, policiesDeletedInRic, policiesRecreatedInRic); } @@ -123,17 +125,17 @@ public class RicSynchronizationTask { @SuppressWarnings("squid:S2629") private void onSynchronizationError(Ric ric, Throwable t) { logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage()); + // If recovery fails, try to remove all instances deleteAllPoliciesInRepository(ric); - Flux typesRecoveredForRic = this.a1ClientFactory.createA1Client(ric) // + Flux recoverTypes = this.a1ClientFactory.createA1Client(ric) // .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client)); + Flux deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) // + .flatMapMany(a1Client -> a1Client.deleteAllPolicies()) // + .doOnComplete(() -> deleteAllPoliciesInRepository(ric)); - // If recovery fails, try to remove all instances - Flux policiesDeletedInRic = this.a1ClientFactory.createA1Client(ric) // - .flatMapMany(A1Client::deleteAllPolicies); - - Flux.merge(typesRecoveredForRic, policiesDeletedInRic) // - .subscribe(x -> logger.debug("Brute recover: {}", x), // + Flux.concat(recoverTypes, deletePoliciesInRic) // + .subscribe(x -> logger.debug("Brute recover: " + x), // throwable -> onRecoveryError(ric, throwable), // () -> onSynchronizationComplete(ric)); } @@ -149,8 +151,8 @@ public class RicSynchronizationTask { } private Flux synchronizePolicyTypes(Ric ric, A1Client a1Client) { - ric.clearSupportedPolicyTypes(); return a1Client.getPolicyTypeIdentities() // + .doOnNext(x -> ric.clearSupportedPolicyTypes()) // .flatMapMany(Flux::fromIterable) // .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) // diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 9d45f058..4ea6ff9e 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -28,10 +28,17 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.oransc.policyagent.configuration.ApplicationConfig; @@ -43,6 +50,7 @@ import org.oransc.policyagent.controllers.ServiceStatus; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicy; import org.oransc.policyagent.repository.ImmutablePolicyType; +import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; @@ -63,6 +71,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus.Series; import org.springframework.http.MediaType; @@ -97,8 +106,8 @@ public class ApplicationTest { Services services; private static Gson gson = new GsonBuilder() // - .serializeNulls() // - .create(); // + .serializeNulls() // + .create(); // public static class MockApplicationConfig extends ApplicationConfig { @Override @@ -124,20 +133,10 @@ public class ApplicationTest { return new MockA1ClientFactory(this.policyTypes); } - @Bean - public Policies getPolicies() { - return new Policies(); - } - @Bean public PolicyTypes getPolicyTypes() { return this.policyTypes; } - - @Bean - public Rics getRics() { - return new Rics(); - } } @LocalServerPort @@ -150,7 +149,7 @@ public class ApplicationTest { @Override public boolean hasError(ClientHttpResponse httpResponse) throws IOException { return (httpResponse.getStatusCode().series() == Series.CLIENT_ERROR - || httpResponse.getStatusCode().series() == Series.SERVER_ERROR); + || httpResponse.getStatusCode().series() == Series.SERVER_ERROR); } @Override @@ -159,9 +158,30 @@ public class ApplicationTest { } } + private void setRestErrorhandler() { + restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler()); + } + + @BeforeEach + public void reset() { + rics.clear(); + policies.clear(); + policyTypes.clear(); + services.clear(); + } + + @AfterEach + public void verifyNoRicLocks() { + for (Ric ric : this.rics.getRics()) { + ric.getLock().lockBlocking(LockType.EXCLUSIVE); + ric.getLock().unlock(); + assertThat(ric.getLock().getLockCounter()).isEqualTo(0); + assertThat(ric.getState()).isEqualTo(Ric.RicState.IDLE); + } + } + @Test public void testGetRics() throws Exception { - reset(); addRic("kista_1"); String url = baseUrl() + "/rics"; String rsp = this.restTemplate.getForObject(url, String.class); @@ -175,7 +195,7 @@ public class ApplicationTest { @Test public void testRecovery() throws Exception { - reset(); + addRic("ric").setState(Ric.RicState.UNDEFINED); String ricName = "ric"; Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName); @@ -196,7 +216,6 @@ public class ApplicationTest { @Test public void testGetRicForManagedElement_thenReturnCorrectRic() throws Exception { - reset(); addRic("notCorrectRic1"); addRic("notCorrectRic2"); addRic("notCorrectRic3"); @@ -216,24 +235,15 @@ public class ApplicationTest { } @Test - public void testGetRicForManagedElementThatDoesNotExist_thenReturnEmpty() throws Exception { - reset(); - addRic("notCorrectRic1"); - addRic("notCorrectRic2"); - addRic("notCorrectRic3"); - addRic("notCorrectRic4"); - addRic("notCorrectRic5"); - addRic("notCorrectRic6"); - + public void testGetRicForManagedElementThatDoesNotExist() throws Exception { + this.setRestErrorhandler(); String url = baseUrl() + "/ric?managedElementId=kista_1"; - String rsp = this.restTemplate.getForObject(url, String.class); - - assertThat(rsp).isNull(); + ResponseEntity entity = this.restTemplate.getForEntity(url, String.class); + assertThat(entity.getStatusCode().equals(HttpStatus.NOT_FOUND)); } @Test public void testPutPolicy() throws Exception { - reset(); String serviceName = "service1"; String ricName = "ric1"; String policyTypeName = "type1"; @@ -243,7 +253,7 @@ public class ApplicationTest { addPolicyType(policyTypeName, ricName); String url = baseUrl() + "/policy?type=" + policyTypeName + "&instance=" + policyInstanceId + "&ric=" + ricName - + "&service=" + serviceName; + + "&service=" + serviceName; final String json = jsonString(); this.rics.getRic(ricName).setState(Ric.RicState.IDLE); @@ -265,13 +275,15 @@ public class ApplicationTest { public void testRefuseToUpdatePolicy() throws Exception { // Test that only the json can be changed for a already created policy // In this case service is attempted to be changed - reset(); - this.addRic("ric1").setState(Ric.RicState.IDLE); - this.addRic("ricXXX").setState(Ric.RicState.IDLE); + this.addRic("ric1"); + this.addRic("ricXXX"); this.addPolicy("instance1", "type1", "service1", "ric1"); + this.setRestErrorhandler(); String urlWrongRic = baseUrl() + "/policy?type=type1&instance=instance1&ric=ricXXX&service=service1"; - this.restTemplate.put(urlWrongRic, createJsonHttpEntity(jsonString())); + ResponseEntity entity = this.putForEntity(urlWrongRic, jsonString()); + assertThat(entity.getStatusCode().equals(HttpStatus.METHOD_NOT_ALLOWED)); + Policy policy = policies.getPolicy("instance1"); assertThat(policy.ric().name()).isEqualTo("ric1"); // Not changed } @@ -293,10 +305,8 @@ public class ApplicationTest { @Test public void testDeletePolicy() throws Exception { - reset(); String url = baseUrl() + "/policy?instance=id"; - Policy policy = addPolicy("id", "typeName", "service1", "ric1"); - policy.ric().setState(Ric.RicState.IDLE); + addPolicy("id", "typeName", "service1", "ric1"); assertThat(policies.size()).isEqualTo(1); this.restTemplate.delete(url); @@ -306,7 +316,6 @@ public class ApplicationTest { @Test public void testGetPolicySchemas() throws Exception { - reset(); addPolicyType("type1", "ric1"); addPolicyType("type2", "ric2"); @@ -328,7 +337,6 @@ public class ApplicationTest { @Test public void testGetPolicySchema() throws Exception { - reset(); addPolicyType("type1", "ric1"); addPolicyType("type2", "ric2"); @@ -341,7 +349,6 @@ public class ApplicationTest { @Test public void testGetPolicyTypes() throws Exception { - reset(); addPolicyType("type1", "ric1"); addPolicyType("type2", "ric2"); @@ -394,7 +401,6 @@ public class ApplicationTest { @Test public void testPutAndGetService() throws Exception { - reset(); // PUT putService("name"); @@ -414,27 +420,26 @@ public class ApplicationTest { System.out.println(rsp); // Keep alive - url = baseUrl() + "/services/keepalive?serviceName=name"; - rsp = this.restTemplate.postForObject(url, null, String.class); - assertThat(rsp.contains("OK")).isTrue(); + url = baseUrl() + "/services/keepalive?name=name"; + ResponseEntity entity = this.restTemplate.postForEntity(url, null, String.class); + assertThat(entity.getStatusCode().equals(HttpStatus.OK)); // DELETE assertThat(services.size()).isEqualTo(1); - url = baseUrl() + "/services?serviceName=name"; + url = baseUrl() + "/services?name=name"; this.restTemplate.delete(url); assertThat(services.size()).isEqualTo(0); // Keep alive, no registerred service - url = baseUrl() + "/services/keepalive?serviceName=nameXXX"; - ResponseEntity entity = this.restTemplate.postForEntity(url, null, String.class); + url = baseUrl() + "/services/keepalive?name=name"; + setRestErrorhandler(); + entity = this.restTemplate.postForEntity(url, null, String.class); assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); } @Test public void testGetPolicyStatus() throws Exception { - reset(); - Policy policy = addPolicy("id", "typeName", "service1", "ric1"); - policy.ric().setState(Ric.RicState.IDLE); + addPolicy("id", "typeName", "service1", "ric1"); assertThat(policies.size()).isEqualTo(1); String url = baseUrl() + "/policy_status?instance=id"; @@ -442,40 +447,14 @@ public class ApplicationTest { assertThat(rsp.equals("OK")).isTrue(); } - private PolicyType addPolicyType(String policyTypeName, String ricName) { - PolicyType type = ImmutablePolicyType.builder() // - .name(policyTypeName) // - .schema("{\"title\":\"" + policyTypeName + "\"}") // - .build(); - - policyTypes.put(type); - addRic(ricName).addSupportedPolicyType(type); - return type; - } - - private Ric addRic(String ricName) { - if (rics.get(ricName) != null) { - return rics.get(ricName); - } - Vector mes = new Vector<>(); - RicConfig conf = ImmutableRicConfig.builder() // - .name(ricName) // - .baseUrl(ricName) // - .managedElementIds(mes) // - .build(); - Ric ric = new Ric(conf); - this.rics.put(ric); - return ric; - } - private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException { addRic(ric); Policy p = ImmutablePolicy.builder().id(id) // - .json(jsonString()) // - .ownerServiceName(service) // - .ric(rics.getRic(ric)) // - .type(addPolicyType(typeName, ric)) // - .lastModified("lastModified").build(); + .json(jsonString()) // + .ownerServiceName(service) // + .ric(rics.getRic(ric)) // + .type(addPolicyType(typeName, ric)) // + .lastModified("lastModified").build(); policies.put(p); return p; } @@ -501,29 +480,105 @@ public class ApplicationTest { return "http://localhost:" + port; } - private void reset() { - rics.clear(); - policies.clear(); - policyTypes.clear(); - services.clear(); - assertThat(policies.size()).isEqualTo(0); - restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler()); - } - private String jsonString() { return "{\n \"servingCellNrcgi\": \"1\"\n }"; } + private static class ConcurrencyTestRunnable implements Runnable { + private final RestTemplate restTemplate = new RestTemplate(); + private final String baseUrl; + static AtomicInteger nextCount = new AtomicInteger(0); + private final int count; + private final RepositorySupervision supervision; + + ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) { + this.baseUrl = baseUrl; + this.count = nextCount.incrementAndGet(); + this.supervision = supervision; + } + + public void run() { + for (int i = 0; i < 100; ++i) { + if (i % 10 == 0) { + this.supervision.checkAllRics(); + } + String name = "policy:" + count + ":" + i; + putPolicy(name); + deletePolicy(name); + } + } + + private void putPolicy(String name) { + String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1"; + this.restTemplate.put(putUrl, createJsonHttpEntity("{}")); + } + + private void deletePolicy(String name) { + String deleteUrl = baseUrl + "/policy?instance=" + name; + this.restTemplate.delete(deleteUrl); + } + } + + @Test + public void testConcurrency() throws Exception { + final Instant startTime = Instant.now(); + List threads = new ArrayList<>(); + addRic("ric1"); + addPolicyType("type1", "ric1"); + + for (int i = 0; i < 100; ++i) { + Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i); + t.start(); + threads.add(t); + } + for (Thread t : threads) { + t.join(); + } + assertThat(policies.size()).isEqualTo(0); + System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now())); + } + private MockA1Client getA1Client(String ricName) throws ServiceException { return a1ClientFactory.getOrCreateA1Client(ricName); } - private HttpEntity createJsonHttpEntity(String content) { + private PolicyType addPolicyType(String policyTypeName, String ricName) { + PolicyType type = ImmutablePolicyType.builder() // + .name(policyTypeName) // + .schema("{\"title\":\"" + policyTypeName + "\"}") // + .build(); + + policyTypes.put(type); + addRic(ricName).addSupportedPolicyType(type); + return type; + } + + private Ric addRic(String ricName) { + if (rics.get(ricName) != null) { + return rics.get(ricName); + } + Vector mes = new Vector<>(); + RicConfig conf = ImmutableRicConfig.builder() // + .name(ricName) // + .baseUrl(ricName) // + .managedElementIds(mes) // + .build(); + Ric ric = new Ric(conf); + ric.setState(Ric.RicState.IDLE); + this.rics.put(ric); + return ric; + } + + private static HttpEntity createJsonHttpEntity(String content) { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); return new HttpEntity(content, headers); } + private ResponseEntity putForEntity(String url, String jsonBody) { + return restTemplate.exchange(url, HttpMethod.PUT, createJsonHttpEntity(jsonBody), String.class); + } + private static List parseList(String jsonString, Class clazz) { List result = new ArrayList<>(); JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray(); @@ -542,5 +597,4 @@ public class ApplicationTest { } return result; } - } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index 4f4a9be2..1ea677ca 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -22,10 +22,12 @@ package org.oransc.policyagent; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + import java.io.File; import java.io.IOException; import java.net.URL; import java.nio.file.Files; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.oransc.policyagent.configuration.ApplicationConfig; diff --git a/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java b/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java index f4e68210..d5dd3b73 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.oransc.policyagent.repository.Policy; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java index 5136a703..02c84db8 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java @@ -121,7 +121,8 @@ public class ApplicationConfigParserTest { private JsonObject fake_info_object; public String toString() { - return String.format("[dmaap_publisher=%s, fake_info_object=%s]", dmaap_publisher.toString(), fake_info_object.toString()); + return String.format("[dmaap_publisher=%s, fake_info_object=%s]", dmaap_publisher.toString(), + fake_info_object.toString()); } } @@ -147,7 +148,8 @@ public class ApplicationConfigParserTest { private JsonObject fake_info_object; public String toString() { - return String.format("[dmaap_subscriber=%s, fake_info_object=%s]", dmaap_subscriber.toString(), fake_info_object.toString()); + return String.format("[dmaap_subscriber=%s, fake_info_object=%s]", dmaap_subscriber.toString(), + fake_info_object.toString()); } } @@ -227,8 +229,8 @@ public class ApplicationConfigParserTest { "Wrong error message when the streams publishes' URL has incorrect syntax"); } - public JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes, String dmaapPublisherOrSubscriber) - throws Exception { + public JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes, + String dmaapPublisherOrSubscriber) throws Exception { return jsonRootObject.getAsJsonObject("config").getAsJsonObject(streamsPublishesOrSubscribes) .getAsJsonObject(dmaapPublisherOrSubscriber).getAsJsonObject("dmaap_info"); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java index 859708a5..7b54e068 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java @@ -30,12 +30,14 @@ import static org.mockito.Mockito.verify; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import com.google.common.base.Charsets; import com.google.common.io.Resources; import com.google.gson.JsonIOException; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -45,6 +47,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Properties; import java.util.Vector; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -58,6 +61,7 @@ import org.oransc.policyagent.configuration.ApplicationConfigParser; import org.oransc.policyagent.configuration.ImmutableRicConfig; import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.utils.LoggingUtils; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -65,7 +69,6 @@ import reactor.test.StepVerifier; @ExtendWith(MockitoExtension.class) public class RefreshConfigTaskTest { - private RefreshConfigTask refreshTaskUnderTest; @Spy diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java index fd3dbc01..ea09bb66 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Vector; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -309,4 +310,4 @@ public class RepositorySupervisionTest { when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.error((Exception) returnValue)); } } -} \ No newline at end of file +} diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java index b05349fc..eae00ea1 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java @@ -35,9 +35,11 @@ import static org.mockito.Mockito.when; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import java.time.Duration; import java.util.Arrays; import java.util.Collections; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -58,6 +60,7 @@ import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Service; import org.oransc.policyagent.repository.Services; import org.oransc.policyagent.utils.LoggingUtils; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -150,7 +153,7 @@ public class RicSynchronizationTaskTest { synchronizerUnderTest.run(RIC_1); - verify(a1ClientMock).getPolicyTypeIdentities(); + verify(a1ClientMock, times(1)).getPolicyTypeIdentities(); verifyNoMoreInteractions(a1ClientMock); verify(synchronizerUnderTest).run(RIC_1); @@ -303,6 +306,7 @@ public class RicSynchronizationTaskTest { private void setUpCreationOfA1Client() { when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock)); + doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(); } private AsyncRestClient setUpCreationOfAsyncRestClient(RicSynchronizationTask synchronizerUnderTest) { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java index 381c2d19..a9845285 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java @@ -31,8 +31,10 @@ import static org.mockito.Mockito.when; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import java.time.Duration; import java.util.Collections; + import org.awaitility.Durations; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java index ae9385f0..eeecc1c9 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.when; import static org.oransc.policyagent.repository.Ric.RicState.IDLE; import com.google.common.collect.ImmutableList; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -93,6 +94,7 @@ public class StartupServiceTest { Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); doReturn(policyTypes1, policyTypes2).when(a1ClientMock).getPolicyTypeIdentities(); doReturn(Mono.just("Schema")).when(a1ClientMock).getPolicyTypeSchema(anyString()); + doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); @@ -107,16 +109,18 @@ public class StartupServiceTest { getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C), ApplicationConfig.RicConfigUpdate.ADDED); - await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2)); + Ric firstRic = rics.get(FIRST_RIC_NAME); + Ric secondRic = rics.get(SECOND_RIC_NAME); + await().untilAsserted(() -> assertThat(firstRic.getState()).isEqualTo(IDLE)); + await().untilAsserted(() -> assertThat(secondRic.getState()).isEqualTo(IDLE)); assertTrue(policyTypes.contains(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not added to PolicyTypes."); assertTrue(policyTypes.contains(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not added to PolicyTypes."); assertEquals(2, rics.size(), "Correct number of Rics not added to Rics"); - Ric firstRic = rics.get(FIRST_RIC_NAME); assertNotNull(firstRic, "Ric " + FIRST_RIC_NAME + " not added to repository"); assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics"); - assertEquals(IDLE, firstRic.getState(), "Not correct state for ric " + FIRST_RIC_NAME); + assertEquals(1, firstRic.getSupportedPolicyTypes().size(), "Not correct no of types supported for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME), @@ -125,10 +129,8 @@ public class StartupServiceTest { "Not correct no of managed nodes for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME); - Ric secondRic = rics.get(SECOND_RIC_NAME); assertNotNull(secondRic, "Ric " + SECOND_RIC_NAME + " not added to repository"); assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics"); - assertEquals(IDLE, secondRic.getState(), "Not correct state for " + SECOND_RIC_NAME); assertEquals(2, secondRic.getSupportedPolicyTypes().size(), "Not correct no of types supported for ric " + SECOND_RIC_NAME); assertTrue(secondRic.isSupportingType(POLICY_TYPE_1_NAME),