Merge "Added STD sim 2.0.0 tests"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / clients / OscA1Client.java
index 5a0bdc9..a388267 100644 (file)
@@ -23,7 +23,9 @@ package org.oransc.policyagent.clients;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
 
+import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.configuration.WebClientConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@ import reactor.core.publisher.Mono;
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class OscA1Client implements A1Client {
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
 
     public static class UriBuilder implements A1UriBuilder {
         private final RicConfig ricConfig;
@@ -114,8 +117,8 @@ public class OscA1Client implements A1Client {
     private final AsyncRestClient restClient;
     private final UriBuilder uri;
 
-    public OscA1Client(RicConfig ricConfig) {
-        this(ricConfig, new AsyncRestClient(""));
+    public OscA1Client(RicConfig ricConfig, WebClientConfig clientConfig) {
+        this(ricConfig, new AsyncRestClient("", clientConfig));
     }
 
     public OscA1Client(RicConfig ricConfig, AsyncRestClient restClient) {
@@ -125,6 +128,19 @@ public class OscA1Client implements A1Client {
         uri = new UriBuilder(ricConfig);
     }
 
+    public static Mono<String> extractCreateSchema(String policyTypeResponse, String policyTypeId) {
+        try {
+            JSONObject obj = new JSONObject(policyTypeResponse);
+            JSONObject schemaObj = obj.getJSONObject("create_schema");
+            schemaObj.put(TITLE, policyTypeId);
+            return Mono.just(schemaObj.toString());
+        } catch (Exception e) {
+            String exceptionString = e.toString();
+            logger.error("Unexpected response for policy type: {}, exception: {}", policyTypeResponse, exceptionString);
+            return Mono.error(e);
+        }
+    }
+
     @Override
     public Mono<List<String>> getPolicyTypeIdentities() {
         return getPolicyTypeIds() //
@@ -142,7 +158,7 @@ public class OscA1Client implements A1Client {
     public Mono<String> getPolicyTypeSchema(String policyTypeId) {
         String schemaUri = uri.createGetSchemaUri(policyTypeId);
         return restClient.get(schemaUri) //
-            .flatMap(response -> SdncJsonHelper.getCreateSchema(response, policyTypeId));
+            .flatMap(response -> extractCreateSchema(response, policyTypeId));
     }
 
     @Override
@@ -165,7 +181,7 @@ public class OscA1Client implements A1Client {
     @Override
     public Flux<String> deleteAllPolicies() {
         return getPolicyTypeIds() //
-            .flatMap(this::deletePoliciesForType);
+            .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
     }
 
     @Override
@@ -192,6 +208,6 @@ public class OscA1Client implements A1Client {
 
     private Flux<String> deletePoliciesForType(String typeId) {
         return getPolicyIdentitiesByType(typeId) //
-            .flatMap(policyId -> deletePolicyById(typeId, policyId));
+            .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
     }
 }