Minor changes 29/3529/4
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 29 Apr 2020 12:31:02 +0000 (14:31 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 30 Apr 2020 14:41:33 +0000 (16:41 +0200)
Added a unittest for getting schema through the SDNC
controller.

Increased http READ time out, from 10 to 30 seconds
Added TRACE of all REST bodies and responses

Attempt to fix that loading config from consul stops
when the json is syntaxtically wrong.

Change-Id: I4fc5d43971fd6853737f57aa0900f2e5068cd43c
Issue-ID: NONRTRIC-195
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncJsonHelper.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/test/java/org/oransc/policyagent/clients/SdncOscA1ClientTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
policy-agent/src/test/resources/test_osc_get_schema_response.json [new file with mode: 0644]

index ef1acfc..26d5152 100644 (file)
@@ -28,6 +28,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.handler.timeout.WriteTimeoutHandler;
 
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLException;
 
@@ -52,13 +53,16 @@ public class AsyncRestClient {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private WebClient webClient = null;
     private final String baseUrl;
+    private static final AtomicInteger sequenceNumber = new AtomicInteger();
 
     public AsyncRestClient(String baseUrl) {
         this.baseUrl = baseUrl;
     }
 
     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
-        logger.debug("POST uri = '{}{}''", baseUrl, uri);
+        Object traceTag = createTraceTag();
+        logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
+        logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
         return getWebClient() //
             .flatMap(client -> {
@@ -66,7 +70,7 @@ public class AsyncRestClient {
                     .uri(uri) //
                     .contentType(MediaType.APPLICATION_JSON) //
                     .body(bodyProducer, String.class);
-                return retrieve(request);
+                return retrieve(traceTag, request);
             });
     }
 
@@ -76,7 +80,9 @@ public class AsyncRestClient {
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
-        logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri);
+        Object traceTag = createTraceTag();
+        logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
+        logger.trace("{} POST body: {}", traceTag, body);
         return getWebClient() //
             .flatMap(client -> {
                 RequestHeadersSpec<?> request = client.post() //
@@ -84,30 +90,34 @@ public class AsyncRestClient {
                     .headers(headers -> headers.setBasicAuth(username, password)) //
                     .contentType(MediaType.APPLICATION_JSON) //
                     .bodyValue(body);
-                return retrieve(request) //
+                return retrieve(traceTag, request) //
                     .flatMap(this::toBody);
             });
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
-        logger.debug("PUT uri = '{}{}''", baseUrl, uri);
+        Object traceTag = createTraceTag();
+        logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
+        logger.trace("{} PUT body: {}", traceTag, body);
         return getWebClient() //
             .flatMap(client -> {
                 RequestHeadersSpec<?> request = client.put() //
                     .uri(uri) //
                     .contentType(MediaType.APPLICATION_JSON) //
                     .bodyValue(body);
-                return retrieve(request);
+                return retrieve(traceTag, request);
             });
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
-        logger.debug("PUT uri = '{}{}''", baseUrl, uri);
+        Object traceTag = createTraceTag();
+        logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
+        logger.trace("{} PUT body: <empty>", traceTag);
         return getWebClient() //
             .flatMap(client -> {
                 RequestHeadersSpec<?> request = client.put() //
                     .uri(uri);
-                return retrieve(request);
+                return retrieve(traceTag, request);
             });
     }
 
@@ -117,11 +127,12 @@ public class AsyncRestClient {
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
-        logger.debug("GET uri = '{}{}''", baseUrl, uri);
+        Object traceTag = createTraceTag();
+        logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
         return getWebClient() //
             .flatMap(client -> {
                 RequestHeadersSpec<?> request = client.get().uri(uri);
-                return retrieve(request);
+                return retrieve(traceTag, request);
             });
     }
 
@@ -131,11 +142,12 @@ public class AsyncRestClient {
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
-        logger.debug("DELETE uri = '{}{}''", baseUrl, uri);
+        Object traceTag = createTraceTag();
+        logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
         return getWebClient() //
             .flatMap(client -> {
                 RequestHeadersSpec<?> request = client.delete().uri(uri);
-                return retrieve(request);
+                return retrieve(traceTag, request);
             });
     }
 
@@ -144,19 +156,24 @@ public class AsyncRestClient {
             .flatMap(this::toBody);
     }
 
-    private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
+    private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
         return request.retrieve() //
             .toEntity(String.class) //
-            .doOnError(this::onHttpError);
+            .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody()))
+            .doOnError(throwable -> onHttpError(traceTag, throwable));
     }
 
-    private void onHttpError(Throwable t) {
+    private static Object createTraceTag() {
+        return sequenceNumber.incrementAndGet();
+    }
+
+    private void onHttpError(Object traceTag, Throwable t) {
         if (t instanceof WebClientResponseException) {
             WebClientResponseException exception = (WebClientResponseException) t;
-            logger.debug("HTTP error status = '{}', body '{}'", exception.getStatusCode(),
+            logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
                 exception.getResponseBodyAsString());
         } else {
-            logger.debug("HTTP error: {}", t.getMessage());
+            logger.debug("{} HTTP error: {}", traceTag, t.getMessage());
         }
     }
 
@@ -179,7 +196,7 @@ public class AsyncRestClient {
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
             .secure(c -> c.sslContext(sslContext)) //
             .doOnConnected(connection -> {
-                connection.addHandler(new ReadTimeoutHandler(10));
+                connection.addHandler(new ReadTimeoutHandler(30));
                 connection.addHandler(new WriteTimeoutHandler(30));
             });
         HttpClient httpClient = HttpClient.from(tcpClient);
index 5a0bdc9..90fbd10 100644 (file)
@@ -23,6 +23,7 @@ package org.oransc.policyagent.clients;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
 
+import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
@@ -125,6 +126,19 @@ public class OscA1Client implements A1Client {
         uri = new UriBuilder(ricConfig);
     }
 
+    public static Mono<String> extractCreateSchema(String policyTypeResponse, String policyTypeId) {
+        try {
+            JSONObject obj = new JSONObject(policyTypeResponse);
+            JSONObject schemaObj = obj.getJSONObject("create_schema");
+            schemaObj.put(TITLE, policyTypeId);
+            return Mono.just(schemaObj.toString());
+        } catch (Exception e) {
+            String exceptionString = e.toString();
+            logger.error("Unexpected response for policy type: {}, exception: {}", policyTypeResponse, exceptionString);
+            return Mono.error(e);
+        }
+    }
+
     @Override
     public Mono<List<String>> getPolicyTypeIdentities() {
         return getPolicyTypeIds() //
@@ -142,7 +156,7 @@ public class OscA1Client implements A1Client {
     public Mono<String> getPolicyTypeSchema(String policyTypeId) {
         String schemaUri = uri.createGetSchemaUri(policyTypeId);
         return restClient.get(schemaUri) //
-            .flatMap(response -> SdncJsonHelper.getCreateSchema(response, policyTypeId));
+            .flatMap(response -> extractCreateSchema(response, policyTypeId));
     }
 
     @Override
index d65caf1..4d3cbf8 100644 (file)
@@ -69,19 +69,6 @@ class SdncJsonHelper {
         }
     }
 
-    public static Mono<String> getCreateSchema(String policyTypeResponse, String policyTypeId) {
-        try {
-            JSONObject obj = new JSONObject(policyTypeResponse);
-            JSONObject schemaObj = obj.getJSONObject("create_schema");
-            schemaObj.put("title", policyTypeId);
-            return Mono.just(schemaObj.toString());
-        } catch (Exception e) {
-            String exceptionString = e.toString();
-            logger.error("Unexpected response for policy type: {}, exception: {}", policyTypeResponse, exceptionString);
-            return Mono.error(e);
-        }
-    }
-
     public static <T> String createInputJsonString(T params) {
         JsonElement paramsJson = gson.toJsonTree(params);
         JsonObject jsonObj = new JsonObject();
index d9536a5..fcb3236 100644 (file)
@@ -133,7 +133,7 @@ public class SdncOscA1Client implements A1Client {
             OscA1Client.UriBuilder uri = new OscA1Client.UriBuilder(ricConfig);
             final String ricUrl = uri.createGetSchemaUri(policyTypeId);
             return post(GET_POLICY_RPC, ricUrl, Optional.empty()) //
-                .flatMap(response -> SdncJsonHelper.getCreateSchema(response, policyTypeId));
+                .flatMap(response -> OscA1Client.extractCreateSchema(response, policyTypeId));
         } else {
             return Mono.error(createIllegalProtocolException());
         }
index dd235db..05bcb0f 100644 (file)
@@ -121,7 +121,6 @@ public class RefreshConfigTask {
 
     Flux<RicConfigUpdate.Type> createRefreshTask() {
         Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
-            .filter(notUsed -> configFileExists()) //
             .filter(notUsed -> !this.isConsulUsed) //
             .flatMap(notUsed -> loadConfigurationFromFile()) //
             .onErrorResume(this::ignoreErrorFlux) //
@@ -132,6 +131,7 @@ public class RefreshConfigTask {
             .flatMap(i -> getEnvironment(systemEnvironment)) //
             .flatMap(this::createCbsClient) //
             .flatMap(this::getFromCbs) //
+            .onErrorResume(this::ignoreErrorMono) //
             .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
             .doOnNext(json -> this.isConsulUsed = true) //
             .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
@@ -156,8 +156,12 @@ public class RefreshConfigTask {
 
     private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
-        return cbsClient.get(getConfigRequest) //
-            .onErrorResume(this::ignoreErrorMono);
+        try {
+            return cbsClient.get(getConfigRequest) //
+                .onErrorResume(this::ignoreErrorMono);
+        } catch (Exception e) {
+            return ignoreErrorMono(e);
+        }
     }
 
     private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
@@ -176,7 +180,7 @@ public class RefreshConfigTask {
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             return Mono.just(parser.parse(jsonObject));
-        } catch (ServiceException e) {
+        } catch (Exception e) {
             String str = e.toString();
             logger.error("Could not parse configuration {}", str);
             return Mono.empty();
@@ -187,8 +191,7 @@ public class RefreshConfigTask {
         return this.appConfig.setConfiguration(config);
     }
 
-    boolean configFileExists() {
-        String filepath = appConfig.getLocalConfigurationFilePath();
+    boolean fileExists(String filepath) {
         return (filepath != null && (new File(filepath).exists()));
     }
 
@@ -230,6 +233,10 @@ public class RefreshConfigTask {
      */
     Flux<JsonObject> loadConfigurationFromFile() {
         String filepath = appConfig.getLocalConfigurationFilePath();
+        if (!fileExists(filepath)) {
+            return Flux.empty();
+        }
+
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
 
@@ -239,7 +246,7 @@ public class RefreshConfigTask {
             appParser.parse(rootObject);
             logger.debug("Local configuration file loaded: {}", filepath);
             return Flux.just(rootObject);
-        } catch (Exception e) {
+        } catch (IOException | ServiceException e) {
             logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
             return Flux.empty();
         }
index fd50425..2cdfe35 100644 (file)
@@ -27,7 +27,12 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonElement;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -112,7 +117,30 @@ public class SdncOscA1ClientTest {
         String expInput = SdncJsonHelper.createInputJsonString(expectedParams);
         verify(asyncRestClientMock).postWithAuthHeader(GET_A1_POLICY_URL, expInput, CONTROLLER_USERNAME,
             CONTROLLER_PASSWORD);
+    }
+
+    private String loadFile(String fileName) throws IOException {
+        ClassLoader loader = Thread.currentThread().getContextClassLoader();
+        URL url = loader.getResource(fileName);
+        File file = new File(url.getFile());
+        return new String(Files.readAllBytes(file.toPath()));
+    }
+
+    @Test
+    public void testGetTypeSchema_OSC() throws IOException {
+        clientUnderTest = new SdncOscA1Client(A1ProtocolType.SDNC_OSC_OSC_V1, //
+            A1ClientHelper.createRic(RIC_1_URL).getConfig(), //
+            controllerConfig(), asyncRestClientMock);
+
+        String ricResponse = loadFile("test_osc_get_schema_response.json");
+        JsonElement elem = gson().fromJson(ricResponse, JsonElement.class);
+        String responseFromController = createResponse(elem);
+        whenAsyncPostThenReturn(Mono.just(responseFromController));
 
+        String response = clientUnderTest.getPolicyTypeSchema("policyTypeId").block();
+        JsonElement respJson = gson().fromJson(response, JsonElement.class);
+        assertEquals("policyTypeId", respJson.getAsJsonObject().get("title").getAsString(),
+            "title should be updated to contain policyType ID");
     }
 
     private String policiesUrl() {
index 00d2c99..cb91133 100644 (file)
@@ -124,7 +124,7 @@ public class RefreshConfigTaskTest {
         RefreshConfigTask obj = spy(new RefreshConfigTask(appConfig, rics, policies, new Services(), new PolicyTypes(),
             new A1ClientFactory(appConfig)));
         if (stubConfigFileExists) {
-            doReturn(configFileExists).when(obj).configFileExists();
+            doReturn(configFileExists).when(obj).fileExists(any());
         }
         return obj;
     }
diff --git a/policy-agent/src/test/resources/test_osc_get_schema_response.json b/policy-agent/src/test/resources/test_osc_get_schema_response.json
new file mode 100644 (file)
index 0000000..537d86f
--- /dev/null
@@ -0,0 +1,45 @@
+{
+  "name": "pt1",
+  "description": "pt1 policy type",
+  "policy_type_id": 1,
+  "create_schema": {
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "title": "OSC_Type1_1.0.0",
+    "description": "Type 1 policy type",
+    "type": "object",
+    "properties": {
+      "scope": {
+        "type": "object",
+        "properties": {
+          "ueId": {
+            "type": "string"
+          },
+          "qosId": {
+            "type": "string"
+          }
+        },
+        "additionalProperties": false,
+        "required": [
+          "ueId",
+          "qosId"
+        ]
+      },
+      "qosObjective": {
+        "type": "object",
+        "properties": {
+          "priorityLevel": {
+            "type": "number"
+          }
+        },
+        "additionalProperties": false,
+        "required": [
+          "priorityLevel"
+        ]
+      }
+    },
+    "additionalProperties": false,
+    "required": [
+      "scope", "qosObjective"
+    ]
+  }
+}