.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
-
+.. Copyright (C) 2020 Nordix
========
API-Docs
An ONAP SDNC Controller for the A1 interface.
+See :ref:`sdnc-a1-controller-api` for how to use the API.
+
See the README.md file in the nonrtric/sdnc-a1-controller repo for info about how to use it.
Complementary tools
There are two additional tools that can be used together with the Non-RT RIC, namely the Control Panel and the Near-RT RIC simulator.
The Non-RT RIC Control Panel provides a user interface that allows the user to interact with the Non-RT RIC.
-It can be downloaded using: git clone "https://gerrit.o-ran-sc.org/r/portal/nonrtric-controlpanel"
+Documentation for the Control Panel can be found here:
+:doc:`Non-RT RIC Control Panel <nonrtric-controlpanel:index>`.
+It can be downloaded from here: ::
+
+ git clone "https://gerrit.o-ran-sc.org/r/portal/nonrtric-controlpanel".
+
+The Near-RT RIC simulator simulates an A1 protocol termination endpoint. Documentation for the simulator can be found
+here: :doc:`A1 Interface Simulator <sim-a1-interface:index>`. It can be downloaded from here: ::
-The Near-RT RIC simulator does what its name suggests, and can be downloaded here: git clone "https://gerrit.o-ran-sc.org/r/sim/a1-interface"
+ git clone "https://gerrit.o-ran-sc.org/r/sim/a1-interface"
from docs_conf.conf import *
+
+#branch configuration
+
+branch = 'latest'
+
linkcheck_ignore = [
'http://localhost.*',
'http://127.0.0.1.*',
'https://gerrit.o-ran-sc.org.*'
]
+
+#intershpinx mapping with other projects
+intersphinx_mapping = {}
+
+intersphinx_mapping['nonrtric-controlpanel'] = ('https://docs.o-ran-sc.org/projects/o-ran-sc-portal-nonrtric-controlpanel/en/%s' % branch, None)
+intersphinx_mapping['sim-a1-interface'] = ('https://docs.o-ran-sc.org/projects/o-ran-sc-sim-a1-interface/en/%s' % branch, None)
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. SPDX-License-Identifier: CC-BY-4.0
+.. Copyright (C) 2020 Nordix
Developer Guide
===============
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. SPDX-License-Identifier: CC-BY-4.0
-
+.. Copyright (C) 2020 Nordix
Non-RT RIC
==========
./api-docs.rst
./policy-agent-api.rst
+ ./sdnc-a1-controller-api.rst
./developer-guide.rst
./installation-guide.rst
./overview.rst
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
-
-
+.. Copyright (C) 2020 Nordix
Installation Guide
==================
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. SPDX-License-Identifier: CC-BY-4.0
-
+.. Copyright (C) 2020 Nordix
Requirements for the Non-RT RIC project
==========================================
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
+.. Copyright (C) 2020 Nordix
.. |nbsp| unicode:: 0xA0
:trim:
.. This work is licensed under a Creative Commons Attribution 4.0 International License.
.. http://creativecommons.org/licenses/by/4.0
+.. Copyright (C) 2020 Nordix
=============
Release-Notes
--- /dev/null
+.. This work is licensed under a Creative Commons Attribution 4.0 International License.
+.. http://creativecommons.org/licenses/by/4.0
+.. Copyright (C) 2020 Nordix
+
+.. _sdnc-a1-controller-api:
+
+.. |nbsp| unicode:: 0xA0
+ :trim:
+
+.. |nbh| unicode:: 0x2011
+ :trim:
+
+##################
+SDNC A1 Controller
+##################
+
+The A1 of a Near |nbh| RT |nbsp| RIC can be used through the SDNC A1 Controller.
+
+Any version of the A1 API can be used. A call to the SDNC A1 Controller always contains the actual URL to the
+Near |nbh| RT |nbsp| RIC, so here any of the supported API versions can be used. The controller just calls the provided
+URL with the provided data.
+
+Get Policy Type
+~~~~~~~~~~~~~~~
+
+POST
+++++
+
+ Gets a policy type.
+
+ **URL path:**
+ /restconf/operations/A1-ADAPTER-API:getA1PolicyType
+
+ **Parameters:**
+
+ None.
+
+ **Body:** (*Required*)
+
+ A JSON. ::
+
+ {
+ "input": {
+ "near-rt-ric-url": "<url-to-near-rt-ric-to-get-type>"
+ }
+ }
+
+ **Responses:**
+
+ 200:
+ A JSON where the body tag contains the JSON object of the policy type. ::
+
+ {
+ "output": {
+ "http-status": "integer",
+ "body": "{
+ <policy-type>
+ }"
+ }
+ }
+
+ **Examples:**
+
+ Call: ::
+
+ curl -X POST "http://localhost:8282/restconf/operations/A1-ADAPTER-API:getA1PolicyType"
+ -H "Content-Type: application/json" -d "{
+ \"input\": {
+ \"near-rt-ric-url\": \"http://nearRtRic-sim1:8085/a1-p/policytypes/11\"
+ }
+ }"
+
+ Result:
+ 200 ::
+
+ {
+ "output": {
+ "http-status": 200,
+ "body": "{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Example_QoETarget_1.0.0",
+ "description": "Example QoE Target policy type",
+ "type": "object",
+ "properties": {
+ "scope": {
+ "type": "object",
+ "properties": {
+ "ueId": {
+ "type": "string"
+ },
+ "sliceId": {
+ "type": "string"
+ },
+ "qosId": {
+ "type": "string"
+ },
+ "cellId": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": false,
+ "required": [
+ "ueId",
+ "sliceId"
+ ]
+ },
+ "statement": {
+ "type": "object",
+ "properties": {
+ "qoeScore": {
+ "type": "number"
+ },
+ "initialBuffering": {
+ "type": "number"
+ },
+ "reBuffFreq": {
+ "type": "number"
+ },
+ "stallRatio": {
+ "type": "number"
+ }
+ },
+ "minProperties": 1,
+ "additionalProperties": false
+ }
+ }
+ }
+ }
+ }"
+
+Put Policy
+~~~~~~~~~~
+
+POST
+++++
+
+ Creates or updates a policy instance.
+
+ **URL path:**
+ /restconf/operations/A1-ADAPTER-API:putA1Policy
+
+ **Parameters:**
+
+ None.
+
+ **Body:** (*Required*)
+
+ A JSON where the body tag contains the JSON object of the policy. ::
+
+ {
+ "input": {
+ "near-rt-ric-url": "<url-to-near-rt-ric-to-put-policy>",
+ "body": "object"
+ }
+ }
+
+ **Responses:**
+
+ 200:
+ A JSON with the response. ::
+
+ {
+ "output": {
+ "http-status": "integer"
+ }
+ }
+
+ **Examples:**
+
+ Call: ::
+
+ curl -X POST "http://localhost:8282/restconf/operations/A1-ADAPTER-API:getA1PolicyType"
+ -H "Content-Type: application/json" -d "{
+ \"input\": {
+ \"near-rt-ric-url\": \"http://nearRtRic-sim1:8085/a1-p/policytypes/11/policies/3d2157af-6a8f-4a7c-810f-38c2f824bf12\",
+ \"body\": \"{
+ \"blocking_rate\":20,
+ \"enforce\":true,
+ \"trigger_threshold\":10,
+ \"window_length\":10
+ }\"
+ }
+ }"
+
+ Result:
+ 200 ::
+
+ {
+ "output": {
+ "http-status": 200
+ }
+ }
+
+Get Policy
+~~~~~~~~~~
+
+POST
+++++
+
+ Gets a policy instance.
+
+ **URL path:**
+ /restconf/operations/A1-ADAPTER-API:getA1Policy
+
+ **Parameters:**
+
+ None.
+
+ **Body:** (*Required*)
+
+ A JSON. ::
+
+ {
+ "input": {
+ "near-rt-ric-url": "<url-to-near-rt-ric-to-get-policy>"
+ }
+ }
+
+ **Responses:**
+
+ 200:
+ A JSON where the body tag contains the JSON object of the policy. ::
+
+ {
+ "output": {
+ "http-status": "integer",
+ "body": "{
+ <policy>
+ }"
+ }
+ }
+
+ **Examples:**
+
+ Call: ::
+
+ curl -X POST "http://localhost:8282/restconf/operations/A1-ADAPTER-API:getA1Policy"
+ -H "Content-Type: application/json" -d "{
+ \"input\": {
+ \"near-rt-ric-url\": \"http://nearRtRic-sim1:8085/a1-p/policytypes/11/policies/3d2157af-6a8f-4a7c-810f-38c2f824bf12\"
+ }
+ }"
+
+ Result:
+ 200 ::
+
+ {
+ "output": {
+ "http-status": 200,
+ "body": "{
+ \"blocking_rate\": 20,
+ \"enforce\": true,
+ \"trigger_threshold\": 10,
+ \"window_length\": 10
+ }"
+ }
+ }
+
+Delete Policy
+~~~~~~~~~~~~~
+
+POST
+++++
+
+ Deletes a policy instance.
+
+ **URL path:**
+ /restconf/operations/A1-ADAPTER-API:deleteA1Policy
+
+ **Parameters:**
+
+ None.
+
+ **Body:** (*Required*)
+
+ A JSON. ::
+
+ {
+ "input": {
+ "near-rt-ric-url": "<url-to-near-rt-ric-to-delete-policy>"
+ }
+ }
+
+ **Responses:**
+
+ 200:
+ A JSON with the response. ::
+
+ {
+ "output": {
+ "http-status": "integer"
+ }
+ }
+
+ **Examples:**
+
+ Call: ::
+
+ curl -X POST "http://localhost:8282/restconf/operations/A1-ADAPTER-API:deleteA1Policy"
+ -H "Content-Type: application/json" -d "{
+ \"input\": {
+ \"near-rt-ric-url\": \"http://nearRtRic-sim1:8085/a1-p/policytypes/11/policies/3d2157af-6a8f-4a7c-810f-38c2f824bf12\"
+ }
+ }"
+
+ Result:
+ 200 ::
+
+ {
+ "output": {
+ "http-status": 202
+ }
+ }
+
+Get Policy Status
+~~~~~~~~~~~~~~~~~
+
+POST
+++++
+
+ Get the status of a policy instance.
+
+ **URL path:**
+ /restconf/operations/A1-ADAPTER-API:getA1PolicyStatus
+
+ **Parameters:**
+
+ None.
+
+ **Body:** (*Required*)
+
+ A JSON. ::
+
+ {
+ "input": {
+ "near-rt-ric-url": "<url-to-near-rt-ric-to-get-policy-status>"
+ }
+ }
+
+ **Responses:**
+
+ 200:
+ A JSON where the body tag contains the JSON object with the policy status according to the API version used. ::
+
+ {
+ "output": {
+ "http-status": "integer",
+ "body": "{
+ <policy-status-object>
+ }"
+ }
+ }
+
+ **Examples:**
+
+ Call: ::
+
+ curl -X POST "http://localhost:8282/restconf/operations/A1-ADAPTER-API:getA1PolicyStatus"
+ -H "Content-Type: application/json" -d "{
+ \"input\": {
+ \"near-rt-ric-url\": \"http://nearRtRic-sim1:8085/a1-p/policytypes/11/policies/3d2157af-6a8f-4a7c-810f-38c2f824bf12/status\"
+ }
+ }"
+
+ Result:
+ 200 ::
+
+ {
+ "output": {
+ "http-status": 200,
+ "body": "{
+ "instance_status": "IN EFFECT",
+ "has_been_deleted": "true",
+ "created_at": "Wed, 01 Apr 2020 07:45:45 GMT"
+ }"
+ }
+ }
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
return request.retrieve() //
- .toEntity(String.class);
+ .toEntity(String.class) //
+ .doOnError(this::onHttpError);
}
- Mono<String> toBody(ResponseEntity<String> entity) {
+ private void onHttpError(Throwable t) {
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) t;
+ logger.debug("HTTP error status = '{}', body '{}'", exception.getStatusCode(),
+ exception.getResponseBodyAsString());
+ } else {
+ logger.debug("HTTP error: {}", t.getMessage());
+ }
+ }
+
+ private Mono<String> toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
return Mono.just("");
} else {
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Common json functionality used by the SDNC clients
*/
+@SuppressWarnings("java:S1192") // Same text in several traces
class SdncJsonHelper {
private static Gson gson = new GsonBuilder() //
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
.create();
+ private static final String OUTPUT = "output";
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private SdncJsonHelper() {
}
}
return Flux.fromIterable(arrayList);
} catch (JSONException ex) { // invalid json
+ logger.debug("Invalid json {}", ex.getMessage());
return Flux.error(ex);
}
}
return gson.toJson(jsonObj);
}
- public static Mono<String> getValueFromResponse(String response, String key) {
+ public static <T> String createOutputJsonString(T params) {
+ JsonElement paramsJson = gson.toJsonTree(params);
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.add(OUTPUT, paramsJson);
+ return gson.toJson(jsonObj);
+ }
+
+ public static Mono<JSONObject> getOutput(String response) {
try {
JSONObject outputJson = new JSONObject(response);
- JSONObject responseParams = outputJson.getJSONObject("output");
- if (!responseParams.has(key)) {
- return Mono.just("");
- }
- String value = responseParams.get(key).toString();
- return Mono.just(value);
+ JSONObject responseParams = outputJson.getJSONObject(OUTPUT);
+ return Mono.just(responseParams);
} catch (JSONException ex) { // invalid json
+ logger.debug("Invalid json {}", ex.getMessage());
return Mono.error(ex);
}
}
+ public static Mono<String> getValueFromResponse(String response, String key) {
+ return getOutput(response) //
+ .flatMap(responseParams -> {
+ if (!responseParams.has(key)) {
+ return Mono.just("");
+ }
+ String value = responseParams.get(key).toString();
+ return Mono.just(value);
+ });
+ }
+
public static Mono<String> extractPolicySchema(String inputString) {
try {
JSONObject jsonObject = new JSONObject(inputString);
String schemaString = schemaObject.toString();
return Mono.just(schemaString);
} catch (JSONException ex) { // invalid json
+ logger.debug("Invalid json {}", ex.getMessage());
return Mono.error(ex);
}
}
import java.util.Optional;
import org.immutables.value.Value;
+import org.json.JSONObject;
import org.oransc.policyagent.configuration.ControllerConfig;
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.repository.Policy;
@Value.Immutable
@org.immutables.gson.Gson.TypeAdapters
- public interface AdapterResponse {
- public String body();
+ public interface AdapterOutput {
+ public Optional<String> body();
public int httpStatus();
}
.create(); //
private static final String GET_POLICY_RPC = "getA1Policy";
- private static final String UNHANDELED_PROTOCOL = "Bug, unhandeled protocoltype: ";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ControllerConfig controllerConfig;
private final AsyncRestClient restClient;
return post(GET_POLICY_RPC, ricUrl, Optional.empty()) //
.flatMapMany(SdncJsonHelper::parseJsonArrayOfString) //
.collectList();
+ } else {
+ return Mono.error(createIllegalProtocolException());
}
- throw new NullPointerException(UNHANDELED_PROTOCOL + this.protocolType);
+
+ }
+
+ private Exception createIllegalProtocolException() {
+ return new NullPointerException("Bug, unhandeled protocoltype: " + this.protocolType);
}
@Override
OscA1Client.UriBuilder uri = new OscA1Client.UriBuilder(ricConfig);
final String ricUrl = uri.createGetSchemaUri(policyTypeId);
return post(GET_POLICY_RPC, ricUrl, Optional.empty());
+ } else {
+ return Mono.error(createIllegalProtocolException());
}
- throw new NullPointerException(UNHANDELED_PROTOCOL + this.protocolType);
}
@Override
public Mono<String> putPolicy(Policy policy) {
- final String ricUrl = getUriBuilder().createPutPolicyUri(policy.type().name(), policy.id());
- return post("putA1Policy", ricUrl, Optional.of(policy.json()));
+ return getUriBuilder() //
+ .flatMap(builder -> {
+ String ricUrl = builder.createPutPolicyUri(policy.type().name(), policy.id());
+ return post("putA1Policy", ricUrl, Optional.of(policy.json()));
+ });
}
@Override
.flatMapMany(Flux::fromIterable)
.flatMap(type -> post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty())) //
.flatMap(SdncJsonHelper::parseJsonArrayOfString);
+ } else {
+ return Flux.error(createIllegalProtocolException());
}
- throw new NullPointerException(UNHANDELED_PROTOCOL + this.protocolType);
}
@Override
@Override
public Mono<String> getPolicyStatus(Policy policy) {
- final String ricUrl = getUriBuilder().createGetPolicyStatusUri(policy.type().name(), policy.id());
- return post("getA1PolicyStatus", ricUrl, Optional.empty());
+ return getUriBuilder() //
+ .flatMap(builder -> {
+ String ricUrl = builder.createGetPolicyStatusUri(policy.type().name(), policy.id());
+ return post("getA1PolicyStatus", ricUrl, Optional.empty());
+ });
}
- private A1UriBuilder getUriBuilder() {
+ private Mono<A1UriBuilder> getUriBuilder() {
if (protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
- return new StdA1ClientVersion1.UriBuilder(ricConfig);
+ return Mono.just(new StdA1ClientVersion1.UriBuilder(ricConfig));
} else if (this.protocolType == A1ProtocolType.SDNC_OSC_OSC_V1) {
- return new OscA1Client.UriBuilder(ricConfig);
+ return Mono.just(new OscA1Client.UriBuilder(ricConfig));
+ } else {
+ return Mono.error(createIllegalProtocolException());
}
- throw new NullPointerException(UNHANDELED_PROTOCOL + this.protocolType);
}
private Mono<A1ProtocolType> tryOscProtocolVersion() {
.flatMapMany(Flux::fromIterable)
.flatMap(type -> post(GET_POLICY_RPC, uri.createGetPolicyIdsUri(type), Optional.empty())) //
.flatMap(SdncJsonHelper::parseJsonArrayOfString);
+ } else {
+ return Flux.error(createIllegalProtocolException());
}
- throw new NullPointerException(UNHANDELED_PROTOCOL + this.protocolType);
}
private Mono<String> deletePolicyById(String type, String policyId) {
- final String ricUrl = getUriBuilder().createDeleteUri(type, policyId);
- return post("deleteA1Policy", ricUrl, Optional.empty());
+ return getUriBuilder() //
+ .flatMap(builder -> {
+ String ricUrl = builder.createDeleteUri(type, policyId);
+ return post("deleteA1Policy", ricUrl, Optional.empty());
+ });
}
private Mono<String> post(String rpcName, String ricUrl, Optional<String> body) {
.body(body) //
.build();
final String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST inputJsonString = {}", inputJsonString);
return restClient
.postWithAuthHeader(controllerUrl(rpcName), inputJsonString, this.controllerConfig.userName(),
.flatMap(this::extractResponseBody);
}
- private Mono<String> extractResponseBody(String response) {
- AdapterResponse output = gson.fromJson(response, ImmutableAdapterResponse.class);
- String body = output.body();
+ private Mono<String> extractResponse(JSONObject responseOutput) {
+ AdapterOutput output = gson.fromJson(responseOutput.toString(), ImmutableAdapterOutput.class);
+ Optional<String> optionalBody = output.body();
+ String body = optionalBody.isPresent() ? optionalBody.get() : "";
if (HttpStatus.valueOf(output.httpStatus()).is2xxSuccessful()) {
return Mono.just(body);
+ } else {
+ logger.debug("Error response: {} {}", output.httpStatus(), body);
+ byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ WebClientResponseException e = new WebClientResponseException(output.httpStatus(), "statusText", null,
+ responseBodyBytes, StandardCharsets.UTF_8, null);
+
+ return Mono.error(e);
}
- byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8);
- WebClientResponseException e = new WebClientResponseException(output.httpStatus(), "statusText", null,
- responseBodyBytes, StandardCharsets.UTF_8, null);
+ }
- return Mono.error(e);
+ private Mono<String> extractResponseBody(String responseStr) {
+ return SdncJsonHelper.getOutput(responseStr) //
+ .flatMap(this::extractResponse);
}
private String controllerUrl(String rpcName) {
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Service;
import org.oransc.policyagent.repository.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@Autowired
private Services services;
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
policyInfo.service = p.ownerServiceName();
policyInfo.lastModified = p.lastModified();
if (!policyInfo.validate()) {
- throw new NullPointerException("BUG, all fields must be set");
+ logger.error("BUG, all fields must be set");
}
v.add(policyInfo);
}
synchronized (this) {
if (lockCounter <= 0) {
lockCounter = -1; // Might as well stop, to make it easier to find the problem
- throw new NullPointerException("Number of unlocks must match the number of locks");
+ logger.error("Number of unlocks must match the number of locks");
}
this.lockCounter--;
if (lockCounter == 0) {
@Override
public String toString() {
- return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
+ return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+ + this.lockRequestQueue.size();
}
/** returns the current number of granted locks */
/**
* The agent is synchronizing the view of the Ric.
*/
- SYNCHRONIZING
+ SYNCHRONIZING,
+
+ /**
+ * A consistency check between the agent and the Ric is done
+ */
+ CONSISTENCY_CHECK
}
}
package org.oransc.policyagent.repository;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
registeredRics.put(ric.name(), ric);
}
- public synchronized Iterable<Ric> getRics() {
+ public synchronized Collection<Ric> getRics() {
return new Vector<>(registeredRics.values());
}
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
.onErrorResume(this::ignoreError) //
- .doOnNext(json -> logger.debug("loadFromFile")) //
+ .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnTerminate(() -> logger.error("loadFromFile Terminate"));
Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
.flatMap(this::createCbsClient) //
.flatMapMany(this::periodicConfigurationUpdates) //
.onErrorResume(this::ignoreError) //
- .doOnNext(json -> logger.debug("loadFromConsul")) //
+ .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
.doOnNext(json -> this.isConsulUsed = true) //
- .doOnTerminate(() -> logger.error("loadFromConsul Terminate"));
+ .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
private final A1ClientFactory a1ClientFactory;
private final Services services;
+ private static class SynchStartedException extends ServiceException {
+ private static final long serialVersionUID = 1L;
+
+ public SynchStartedException(String message) {
+ super(message);
+ }
+ }
+
@Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
@Scheduled(fixedRate = 1000 * 60)
public void checkAllRics() {
logger.debug("Checking Rics starting");
- createTask().subscribe( //
- ric -> logger.debug("Ric: {} checked", ric.ric.name()), //
- null, //
- () -> logger.debug("Checking Rics completed") //
- );
+ createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
}
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkOneRic) //
- .onErrorResume(throwable -> Mono.empty());
+ .flatMap(this::checkOneRic);
}
private Mono<RicData> checkOneRic(RicData ricData) {
return checkRicState(ricData) //
.flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+ .flatMap(notUsed -> setRicState(ricData)) //
.flatMap(x -> checkRicPolicies(ricData)) //
- .flatMap(x -> ricData.ric.getLock().unlock()) //
- .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
- .flatMap(x -> checkRicPolicyTypes(ricData)); //
+ .flatMap(x -> checkRicPolicyTypes(ricData)) //
+ .doOnNext(x -> onRicCheckedOk(ricData)) //
+ .doOnError(t -> onRicCheckedError(t, ricData)) //
+ .onErrorResume(throwable -> Mono.empty());
+ }
+
+ private void onRicCheckedError(Throwable t, RicData ricData) {
+ logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage());
+ if (t instanceof SynchStartedException) {
+ // this is just a temporary state,
+ ricData.ric.setState(RicState.AVAILABLE);
+ } else {
+ ricData.ric.setState(RicState.UNAVAILABLE);
+ }
+ ricData.ric.getLock().unlockBlocking();
+ }
+
+ private void onRicCheckedOk(RicData ricData) {
+ logger.debug("Ric: {} checked OK", ricData.ric.name());
+ ricData.ric.setState(RicState.AVAILABLE);
+ ricData.ric.getLock().unlockBlocking();
+ }
+
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<RicData> setRicState(RicData ric) {
+ synchronized (ric) {
+ if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
+ logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name());
+ return Mono.empty();
+ }
+ ric.ric.setState(RicState.CONSISTENCY_CHECK);
+ return Mono.just(ric);
+ }
}
private static class RicData {
this.a1Client = a1Client;
}
+ A1Client getClient() {
+ return a1Client;
+ }
+
final Ric ric;
- final A1Client a1Client;
+ private final A1Client a1Client;
}
private Mono<RicData> createRicData(Ric ric) {
if (ric.ric.getState() == RicState.UNAVAILABLE) {
return startSynchronization(ric) //
.onErrorResume(t -> Mono.empty());
- } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
+ } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
return Mono.empty();
} else {
return Mono.just(ric);
}
private Mono<RicData> checkRicPolicies(RicData ric) {
- return ric.a1Client.getPolicyIdentities() //
+ return ric.getClient().getPolicyIdentities() //
.flatMap(ricP -> validateInstances(ricP, ric));
}
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
- return ric.a1Client.getPolicyTypeIdentities() //
+
+ return ric.getClient().getPolicyTypeIdentities() //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask synchronizationTask = createSynchronizationTask();
synchronizationTask.run(ric.ric);
- return Mono.error(new Exception("Syncronization started"));
+ return Mono.error(new SynchStartedException("Syncronization started"));
}
RicSynchronizationTask createSynchronizationTask() {
rsp = restClient().get(url).block();
assertThat(rsp).contains("ric2");
assertThat(rsp).doesNotContain("ric1");
+ assertThat(rsp).contains("AVAILABLE");
+
+ // All RICs
+ rsp = restClient().get("/rics").block();
+ assertThat(rsp).contains("ric2");
+ assertThat(rsp).contains("ric1");
// Non existing policy type
url = "/rics?policyType=XXXX";
@Test
public void testSynchronization() throws Exception {
- addRic("ric").setState(Ric.RicState.UNAVAILABLE);
- String ricName = "ric";
- Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName);
-
- getA1Client(ricName).putPolicy(policy2); // put it in the RIC
+ // Two polictypes will be put in the NearRT RICs
+ PolicyTypes nearRtRicPolicyTypes = new PolicyTypes();
+ nearRtRicPolicyTypes.put(createPolicyType("typeName"));
+ nearRtRicPolicyTypes.put(createPolicyType("typeName2"));
+ this.a1ClientFactory.setPolicyTypes(nearRtRicPolicyTypes);
+
+ // One type and one instance added to the agent storage
+ final String ric1Name = "ric1";
+ Ric ric1 = addRic(ric1Name);
+ Policy policy2 = addPolicy("policyId2", "typeName", "service", ric1Name);
+ Ric ric2 = addRic("ric2");
+
+ getA1Client(ric1Name).putPolicy(policy2); // put it in the RIC
policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC
String policyId = "policyId";
- Policy policy = addPolicy(policyId, "typeName", "service", ricName); // This should be created in the RIC
+ Policy policy = addPolicy(policyId, "typeName", "service", ric1Name); // This should be created in the RIC
supervision.checkAllRics(); // The created policy should be put in the RIC
- await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ricName).getState()));
- await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ricName).getState()));
- Policies ricPolicies = getA1Client(ricName).getPolicies();
+ // Wait until synch is completed
+ await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ric1Name).getState()));
+ await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ric1Name).getState()));
+ await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic("ric2").getState()));
+
+ Policies ricPolicies = getA1Client(ric1Name).getPolicies();
assertThat(ricPolicies.size()).isEqualTo(1);
Policy ricPolicy = ricPolicies.get(policyId);
assertThat(ricPolicy.json()).isEqualTo(policy.json());
+
+ // Both types should be in the agent storage after the synch
+ assertThat(ric1.getSupportedPolicyTypes().size()).isEqualTo(2);
+ assertThat(ric2.getSupportedPolicyTypes().size()).isEqualTo(2);
}
@Test
return a1ClientFactory.getOrCreateA1Client(ricName);
}
- private PolicyType addPolicyType(String policyTypeName, String ricName) {
- PolicyType type = ImmutablePolicyType.builder() //
+ private PolicyType createPolicyType(String policyTypeName) {
+ return ImmutablePolicyType.builder() //
.name(policyTypeName) //
.schema("{\"title\":\"" + policyTypeName + "\"}") //
.build();
+ }
+ private PolicyType addPolicyType(String policyTypeName, String ricName) {
+ PolicyType type = createPolicyType(policyTypeName);
policyTypes.put(type);
addRic(ricName).addSupportedPolicyType(type);
return type;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.OngoingStubbing;
import org.oransc.policyagent.clients.A1Client.A1ProtocolType;
+import org.oransc.policyagent.clients.SdncOscA1Client.AdapterOutput;
import org.oransc.policyagent.clients.SdncOscA1Client.AdapterRequest;
-import org.oransc.policyagent.clients.SdncOscA1Client.AdapterResponse;
import org.oransc.policyagent.configuration.ControllerConfig;
import org.oransc.policyagent.configuration.ImmutableControllerConfig;
import org.oransc.policyagent.repository.Policy;
assertEquals(POLICY_TYPE_1_ID, policyTypeIds.get(0), "");
String expUrl = RIC_1_URL + "/a1-p/policytypes";
- AdapterRequest expectedParams = ImmutableAdapterRequest.builder() //
+ ImmutableAdapterRequest expectedParams = ImmutableAdapterRequest.builder() //
.nearRtRicUrl(expUrl) //
.build();
String expInput = SdncJsonHelper.createInputJsonString(expectedParams);
return SdncOscA1Client.gson;
}
- private String createResponse(Object obj) {
- AdapterResponse output = ImmutableAdapterResponse.builder() //
- .body(gson().toJson(obj)) //
+ private String createResponse(Object body) {
+ AdapterOutput output = ImmutableAdapterOutput.builder() //
+ .body(gson().toJson(body)) //
.httpStatus(200) //
.build();
-
- return gson().toJson(output);
+ return SdncJsonHelper.createOutputJsonString(output);
}
@Test
List<String> returned = clientUnderTest.getPolicyIdentities().block();
assertEquals(2, returned.size(), "");
- AdapterRequest expectedParams = ImmutableAdapterRequest.builder() //
+ ImmutableAdapterRequest expectedParams = ImmutableAdapterRequest.builder() //
.nearRtRicUrl(policiesUrl()) //
.build();
String expInput = SdncJsonHelper.createInputJsonString(expectedParams);
@Test
public void testPutPolicyRejected() {
final String policyJson = "{}";
- AdapterResponse adapterResponse = ImmutableAdapterResponse.builder() //
+ AdapterOutput adapterOutput = ImmutableAdapterOutput.builder() //
.body("NOK") //
.httpStatus(400) // ERROR
.build();
- String resp = gson().toJson(adapterResponse);
+ String resp = SdncJsonHelper.createOutputJsonString(adapterOutput);
whenAsyncPostThenReturn(Mono.just(resp));
Mono<String> returnedMono = clientUnderTest
.putPolicy(A1ClientHelper.createPolicy(RIC_1_URL, POLICY_1_ID, policyJson, POLICY_TYPE_1_ID));
+ StepVerifier.create(returnedMono) //
+ .expectSubscription() //
+ .expectErrorMatches(t -> t instanceof WebClientResponseException) //
+ .verify();
final String expUrl = policiesUrl() + "/" + POLICY_1_ID;
AdapterRequest expRequestParams = ImmutableAdapterRequest.builder() //
whenPostReturnOkResponse();
A1ProtocolType returnedVersion = clientUnderTest.getProtocolVersion().block();
assertEquals(A1ProtocolType.SDNC_OSC_STD_V1_1, returnedVersion, "");
+
+ whenPostReturnOkResponseNoBody();
+ returnedVersion = clientUnderTest.getProtocolVersion().block();
+ assertEquals(A1ProtocolType.SDNC_OSC_STD_V1_1, returnedVersion, "");
}
private void whenPostReturnOkResponse() {
- AdapterResponse adapterResponse = ImmutableAdapterResponse.builder() //
+ AdapterOutput adapterOutput = ImmutableAdapterOutput.builder() //
.body("OK") //
.httpStatus(200) //
.build();
- String resp = gson().toJson(adapterResponse);
+ String resp = SdncJsonHelper.createOutputJsonString(adapterOutput);
+ whenAsyncPostThenReturn(Mono.just(resp));
+ }
+
+ private void whenPostReturnOkResponseNoBody() {
+ AdapterOutput adapterOutput = ImmutableAdapterOutput.builder() //
+ .httpStatus(200) //
+ .body(Optional.empty()) //
+ .build();
+
+ String resp = SdncJsonHelper.createOutputJsonString(adapterOutput);
whenAsyncPostThenReturn(Mono.just(resp));
}
@BeforeEach
public void init() {
- doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
types.clear();
policies.clear();
rics.clear();
@Test
public void whenRicIdleAndNoChangedPoliciesOrPolicyTypes_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@Test
public void whenRicUndefined_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.UNAVAILABLE);
rics.put(RIC_1);
@Test
public void whenRicSynchronizing_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.SYNCHRONIZING);
rics.put(RIC_1);
@Test
public void whenRicIdleAndErrorGettingPolicyIdentities_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
verify(supervisorUnderTest).checkAllRics();
verifyNoMoreInteractions(supervisorUnderTest);
+ assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@Test
public void whenRicIdleAndNotSameAmountOfPolicies_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
rics.put(RIC_1);
@Test
public void whenRicIdleAndSameAmountOfPoliciesButNotSamePolicies_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
rics.put(RIC_1);
@Test
public void whenRicIdleAndErrorGettingPolicyTypes_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@Test
public void whenRicIdleAndNotSameAmountOfPolicyTypes_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@Test
public void whenRicIdleAndSameAmountOfPolicyTypesButNotSameTypes_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
PolicyType policyType2 = ImmutablePolicyType.builder() //
.name("policyType2") //
.schema("") //
public class MockA1ClientFactory extends A1ClientFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, MockA1Client> clients = new HashMap<>();
- private final PolicyTypes policyTypes;
+ private PolicyTypes policyTypes;
private Duration asynchDelay = Duration.ofSeconds(0);
public MockA1ClientFactory(PolicyTypes policyTypes) {
return clients.get(ricName);
}
+ public void setPolicyTypes(PolicyTypes policyTypes) {
+ this.policyTypes = policyTypes;
+ }
+
/**
* Simulate network latency. The REST responses will be generated by separate
* threads
clients.clear();
}
+ public PolicyTypes getPolicyTypes() {
+ return this.policyTypes;
+ }
+
}
+# ==================================================================================
+# Copyright (c) 2020 Nordix
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+
# documentation only
[tox]
minversion = 2.0