Two minor bugfies 25/3725/2
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 18 May 2020 07:23:18 +0000 (09:23 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 19 May 2020 08:41:55 +0000 (10:41 +0200)
Enabled receiving of big RESTCONF responses

Serializing policy deletion when services are disabled in order
to avoid exessive memory consumption when there are many policies.

Change-Id: I10766c70861e4d3931b09cec98d281918e2eefef
Issue-ID: NONRTRIC-195
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java

index 3df59bf..4a995c9 100644 (file)
@@ -50,6 +50,7 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
 import org.springframework.lang.Nullable;
 import org.springframework.util.ResourceUtils;
+import org.springframework.web.reactive.function.client.ExchangeStrategies;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
@@ -179,7 +180,7 @@ public class AsyncRestClient {
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
         return request.retrieve() //
             .toEntity(String.class) //
-            .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody()))
+            .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody())) //
             .doOnError(throwable -> onHttpError(traceTag, throwable));
     }
 
@@ -272,9 +273,14 @@ public class AsyncRestClient {
         HttpClient httpClient = HttpClient.from(tcpClient);
         ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
 
+        ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
+            .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
+            .build();
+
         return WebClient.builder() //
             .clientConnector(connector) //
             .baseUrl(baseUrl) //
+            .exchangeStrategies(exchangeStrategies) //
             .build();
     }
 
index 751c0ac..9c55532 100644 (file)
@@ -50,6 +50,7 @@ import reactor.core.publisher.Mono;
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class ServiceSupervision {
     private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
+    static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent
     private final Services services;
     private final Policies policies;
     private A1ClientFactory a1ClientFactory;
@@ -85,7 +86,7 @@ public class ServiceSupervision {
             .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
             .doOnNext(service -> services.remove(service.getName())) //
             .flatMap(this::getAllPoliciesForService) //
-            .flatMap(this::deletePolicy);
+            .flatMap(this::deletePolicy, CONCURRENCY_RIC);
     }
 
     @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally