From: Henrik Andersson Date: Mon, 4 May 2020 08:52:08 +0000 (+0000) Subject: Merge "Minor changes" X-Git-Tag: 2.0.0~60 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=5f38e95cbd87e9c67cd8a8ddb810b629a81cd306;hp=ef3dfbede0a7430e8f7272a599e64d8cea71d017;p=nonrtric.git Merge "Minor changes" --- diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java index ef1acfc1..26d51528 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java @@ -28,6 +28,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; @@ -52,13 +53,16 @@ public class AsyncRestClient { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private WebClient webClient = null; private final String baseUrl; + private static final AtomicInteger sequenceNumber = new AtomicInteger(); public AsyncRestClient(String baseUrl) { this.baseUrl = baseUrl; } public Mono> postForEntity(String uri, @Nullable String body) { - logger.debug("POST uri = '{}{}''", baseUrl, uri); + Object traceTag = createTraceTag(); + logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} POST body: {}", traceTag, body); Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); return getWebClient() // .flatMap(client -> { @@ -66,7 +70,7 @@ public class AsyncRestClient { .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // .body(bodyProducer, String.class); - return retrieve(request); + return retrieve(traceTag, request); }); } @@ -76,7 +80,9 @@ public class AsyncRestClient { } public Mono postWithAuthHeader(String uri, String body, String username, String password) { - logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri); + Object traceTag = createTraceTag(); + logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} POST body: {}", traceTag, body); return getWebClient() // .flatMap(client -> { RequestHeadersSpec request = client.post() // @@ -84,30 +90,34 @@ public class AsyncRestClient { .headers(headers -> headers.setBasicAuth(username, password)) // .contentType(MediaType.APPLICATION_JSON) // .bodyValue(body); - return retrieve(request) // + return retrieve(traceTag, request) // .flatMap(this::toBody); }); } public Mono> putForEntity(String uri, String body) { - logger.debug("PUT uri = '{}{}''", baseUrl, uri); + Object traceTag = createTraceTag(); + logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} PUT body: {}", traceTag, body); return getWebClient() // .flatMap(client -> { RequestHeadersSpec request = client.put() // .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // .bodyValue(body); - return retrieve(request); + return retrieve(traceTag, request); }); } public Mono> putForEntity(String uri) { - logger.debug("PUT uri = '{}{}''", baseUrl, uri); + Object traceTag = createTraceTag(); + logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} PUT body: ", traceTag); return getWebClient() // .flatMap(client -> { RequestHeadersSpec request = client.put() // .uri(uri); - return retrieve(request); + return retrieve(traceTag, request); }); } @@ -117,11 +127,12 @@ public class AsyncRestClient { } public Mono> getForEntity(String uri) { - logger.debug("GET uri = '{}{}''", baseUrl, uri); + Object traceTag = createTraceTag(); + logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); return getWebClient() // .flatMap(client -> { RequestHeadersSpec request = client.get().uri(uri); - return retrieve(request); + return retrieve(traceTag, request); }); } @@ -131,11 +142,12 @@ public class AsyncRestClient { } public Mono> deleteForEntity(String uri) { - logger.debug("DELETE uri = '{}{}''", baseUrl, uri); + Object traceTag = createTraceTag(); + logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); return getWebClient() // .flatMap(client -> { RequestHeadersSpec request = client.delete().uri(uri); - return retrieve(request); + return retrieve(traceTag, request); }); } @@ -144,19 +156,24 @@ public class AsyncRestClient { .flatMap(this::toBody); } - private Mono> retrieve(RequestHeadersSpec request) { + private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { return request.retrieve() // .toEntity(String.class) // - .doOnError(this::onHttpError); + .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody())) + .doOnError(throwable -> onHttpError(traceTag, throwable)); } - private void onHttpError(Throwable t) { + private static Object createTraceTag() { + return sequenceNumber.incrementAndGet(); + } + + private void onHttpError(Object traceTag, Throwable t) { if (t instanceof WebClientResponseException) { WebClientResponseException exception = (WebClientResponseException) t; - logger.debug("HTTP error status = '{}', body '{}'", exception.getStatusCode(), + logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), exception.getResponseBodyAsString()); } else { - logger.debug("HTTP error: {}", t.getMessage()); + logger.debug("{} HTTP error: {}", traceTag, t.getMessage()); } } @@ -179,7 +196,7 @@ public class AsyncRestClient { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // .secure(c -> c.sslContext(sslContext)) // .doOnConnected(connection -> { - connection.addHandler(new ReadTimeoutHandler(10)); + connection.addHandler(new ReadTimeoutHandler(30)); connection.addHandler(new WriteTimeoutHandler(30)); }); HttpClient httpClient = HttpClient.from(tcpClient); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java index 5a0bdc96..90fbd101 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java @@ -23,6 +23,7 @@ package org.oransc.policyagent.clients; import java.lang.invoke.MethodHandles; import java.util.List; +import org.json.JSONObject; import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.repository.Policy; import org.slf4j.Logger; @@ -125,6 +126,19 @@ public class OscA1Client implements A1Client { uri = new UriBuilder(ricConfig); } + public static Mono extractCreateSchema(String policyTypeResponse, String policyTypeId) { + try { + JSONObject obj = new JSONObject(policyTypeResponse); + JSONObject schemaObj = obj.getJSONObject("create_schema"); + schemaObj.put(TITLE, policyTypeId); + return Mono.just(schemaObj.toString()); + } catch (Exception e) { + String exceptionString = e.toString(); + logger.error("Unexpected response for policy type: {}, exception: {}", policyTypeResponse, exceptionString); + return Mono.error(e); + } + } + @Override public Mono> getPolicyTypeIdentities() { return getPolicyTypeIds() // @@ -142,7 +156,7 @@ public class OscA1Client implements A1Client { public Mono getPolicyTypeSchema(String policyTypeId) { String schemaUri = uri.createGetSchemaUri(policyTypeId); return restClient.get(schemaUri) // - .flatMap(response -> SdncJsonHelper.getCreateSchema(response, policyTypeId)); + .flatMap(response -> extractCreateSchema(response, policyTypeId)); } @Override diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncJsonHelper.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncJsonHelper.java index d65caf13..4d3cbf88 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncJsonHelper.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncJsonHelper.java @@ -69,19 +69,6 @@ class SdncJsonHelper { } } - public static Mono getCreateSchema(String policyTypeResponse, String policyTypeId) { - try { - JSONObject obj = new JSONObject(policyTypeResponse); - JSONObject schemaObj = obj.getJSONObject("create_schema"); - schemaObj.put("title", policyTypeId); - return Mono.just(schemaObj.toString()); - } catch (Exception e) { - String exceptionString = e.toString(); - logger.error("Unexpected response for policy type: {}, exception: {}", policyTypeResponse, exceptionString); - return Mono.error(e); - } - } - public static String createInputJsonString(T params) { JsonElement paramsJson = gson.toJsonTree(params); JsonObject jsonObj = new JsonObject(); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java index d9536a55..fcb3236a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java @@ -133,7 +133,7 @@ public class SdncOscA1Client implements A1Client { OscA1Client.UriBuilder uri = new OscA1Client.UriBuilder(ricConfig); final String ricUrl = uri.createGetSchemaUri(policyTypeId); return post(GET_POLICY_RPC, ricUrl, Optional.empty()) // - .flatMap(response -> SdncJsonHelper.getCreateSchema(response, policyTypeId)); + .flatMap(response -> OscA1Client.extractCreateSchema(response, policyTypeId)); } else { return Mono.error(createIllegalProtocolException()); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index dd235db5..05bcb0f2 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -121,7 +121,6 @@ public class RefreshConfigTask { Flux createRefreshTask() { Flux loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) // - .filter(notUsed -> configFileExists()) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // .onErrorResume(this::ignoreErrorFlux) // @@ -132,6 +131,7 @@ public class RefreshConfigTask { .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // .flatMap(this::getFromCbs) // + .onErrorResume(this::ignoreErrorMono) // .doOnNext(json -> logger.debug("loadFromConsul succeeded")) // .doOnNext(json -> this.isConsulUsed = true) // .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); @@ -156,8 +156,12 @@ public class RefreshConfigTask { private Mono getFromCbs(CbsClient cbsClient) { final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.get(getConfigRequest) // - .onErrorResume(this::ignoreErrorMono); + try { + return cbsClient.get(getConfigRequest) // + .onErrorResume(this::ignoreErrorMono); + } catch (Exception e) { + return ignoreErrorMono(e); + } } private Flux ignoreErrorFlux(Throwable throwable) { @@ -176,7 +180,7 @@ public class RefreshConfigTask { try { ApplicationConfigParser parser = new ApplicationConfigParser(); return Mono.just(parser.parse(jsonObject)); - } catch (ServiceException e) { + } catch (Exception e) { String str = e.toString(); logger.error("Could not parse configuration {}", str); return Mono.empty(); @@ -187,8 +191,7 @@ public class RefreshConfigTask { return this.appConfig.setConfiguration(config); } - boolean configFileExists() { - String filepath = appConfig.getLocalConfigurationFilePath(); + boolean fileExists(String filepath) { return (filepath != null && (new File(filepath).exists())); } @@ -230,6 +233,10 @@ public class RefreshConfigTask { */ Flux loadConfigurationFromFile() { String filepath = appConfig.getLocalConfigurationFilePath(); + if (!fileExists(filepath)) { + return Flux.empty(); + } + GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); @@ -239,7 +246,7 @@ public class RefreshConfigTask { appParser.parse(rootObject); logger.debug("Local configuration file loaded: {}", filepath); return Flux.just(rootObject); - } catch (Exception e) { + } catch (IOException | ServiceException e) { logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage()); return Flux.empty(); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/clients/SdncOscA1ClientTest.java b/policy-agent/src/test/java/org/oransc/policyagent/clients/SdncOscA1ClientTest.java index fd504250..2cdfe352 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/clients/SdncOscA1ClientTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/clients/SdncOscA1ClientTest.java @@ -27,7 +27,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.gson.Gson; +import com.google.gson.JsonElement; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -112,7 +117,30 @@ public class SdncOscA1ClientTest { String expInput = SdncJsonHelper.createInputJsonString(expectedParams); verify(asyncRestClientMock).postWithAuthHeader(GET_A1_POLICY_URL, expInput, CONTROLLER_USERNAME, CONTROLLER_PASSWORD); + } + + private String loadFile(String fileName) throws IOException { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + URL url = loader.getResource(fileName); + File file = new File(url.getFile()); + return new String(Files.readAllBytes(file.toPath())); + } + + @Test + public void testGetTypeSchema_OSC() throws IOException { + clientUnderTest = new SdncOscA1Client(A1ProtocolType.SDNC_OSC_OSC_V1, // + A1ClientHelper.createRic(RIC_1_URL).getConfig(), // + controllerConfig(), asyncRestClientMock); + + String ricResponse = loadFile("test_osc_get_schema_response.json"); + JsonElement elem = gson().fromJson(ricResponse, JsonElement.class); + String responseFromController = createResponse(elem); + whenAsyncPostThenReturn(Mono.just(responseFromController)); + String response = clientUnderTest.getPolicyTypeSchema("policyTypeId").block(); + JsonElement respJson = gson().fromJson(response, JsonElement.class); + assertEquals("policyTypeId", respJson.getAsJsonObject().get("title").getAsString(), + "title should be updated to contain policyType ID"); } private String policiesUrl() { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java index 00d2c993..cb911330 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java @@ -124,7 +124,7 @@ public class RefreshConfigTaskTest { RefreshConfigTask obj = spy(new RefreshConfigTask(appConfig, rics, policies, new Services(), new PolicyTypes(), new A1ClientFactory(appConfig))); if (stubConfigFileExists) { - doReturn(configFileExists).when(obj).configFileExists(); + doReturn(configFileExists).when(obj).fileExists(any()); } return obj; } diff --git a/policy-agent/src/test/resources/test_osc_get_schema_response.json b/policy-agent/src/test/resources/test_osc_get_schema_response.json new file mode 100644 index 00000000..537d86f7 --- /dev/null +++ b/policy-agent/src/test/resources/test_osc_get_schema_response.json @@ -0,0 +1,45 @@ +{ + "name": "pt1", + "description": "pt1 policy type", + "policy_type_id": 1, + "create_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "OSC_Type1_1.0.0", + "description": "Type 1 policy type", + "type": "object", + "properties": { + "scope": { + "type": "object", + "properties": { + "ueId": { + "type": "string" + }, + "qosId": { + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "ueId", + "qosId" + ] + }, + "qosObjective": { + "type": "object", + "properties": { + "priorityLevel": { + "type": "number" + } + }, + "additionalProperties": false, + "required": [ + "priorityLevel" + ] + } + }, + "additionalProperties": false, + "required": [ + "scope", "qosObjective" + ] + } +}