summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
70e878f)
Enabled receiving of big RESTCONF responses
Serializing policy deletion when services are disabled in order
to avoid exessive memory consumption when there are many policies.
Change-Id: I10766c70861e4d3931b09cec98d281918e2eefef
Issue-ID: NONRTRIC-195
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.lang.Nullable;
import org.springframework.util.ResourceUtils;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.lang.Nullable;
import org.springframework.util.ResourceUtils;
+import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
import org.springframework.web.reactive.function.client.WebClientResponseException;
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
return request.retrieve() //
.toEntity(String.class) //
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
return request.retrieve() //
.toEntity(String.class) //
- .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody()))
+ .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody())) //
.doOnError(throwable -> onHttpError(traceTag, throwable));
}
.doOnError(throwable -> onHttpError(traceTag, throwable));
}
HttpClient httpClient = HttpClient.from(tcpClient);
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
HttpClient httpClient = HttpClient.from(tcpClient);
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+ ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
+ .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
+ .build();
+
return WebClient.builder() //
.clientConnector(connector) //
.baseUrl(baseUrl) //
return WebClient.builder() //
.clientConnector(connector) //
.baseUrl(baseUrl) //
+ .exchangeStrategies(exchangeStrategies) //
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class ServiceSupervision {
private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class ServiceSupervision {
private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent
private final Services services;
private final Policies policies;
private A1ClientFactory a1ClientFactory;
private final Services services;
private final Policies policies;
private A1ClientFactory a1ClientFactory;
.doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
.doOnNext(service -> services.remove(service.getName())) //
.flatMap(this::getAllPoliciesForService) //
.doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
.doOnNext(service -> services.remove(service.getName())) //
.flatMap(this::getAllPoliciesForService) //
- .flatMap(this::deletePolicy);
+ .flatMap(this::deletePolicy, CONCURRENCY_RIC);
}
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
}
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally