NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / clients / AsyncRestClient.java
index 76da624..b7f23b1 100644 (file)
@@ -28,6 +28,7 @@ import io.netty.handler.timeout.WriteTimeoutHandler;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.oransc.enrichment.configuration.WebClientConfig.HttpProxyConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
@@ -41,8 +42,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
-import reactor.netty.resources.ConnectionProvider;
-import reactor.netty.tcp.TcpClient;
+import reactor.netty.transport.ProxyProvider;
 
 /**
  * Generic reactive REST client.
@@ -54,19 +54,12 @@ public class AsyncRestClient {
     private final String baseUrl;
     private static final AtomicInteger sequenceNumber = new AtomicInteger();
     private final SslContext sslContext;
+    private final HttpProxyConfig httpProxyConfig;
 
-    /**
-     * Note that only http (not https) will work when this constructor is used.
-     * 
-     * @param baseUrl
-     */
-    public AsyncRestClient(String baseUrl) {
-        this(baseUrl, null);
-    }
-
-    public AsyncRestClient(String baseUrl, SslContext sslContext) {
+    public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) {
         this.baseUrl = baseUrl;
         this.sslContext = sslContext;
+        this.httpProxyConfig = httpProxyConfig;
     }
 
     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
@@ -74,96 +67,85 @@ public class AsyncRestClient {
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.post() //
-                    .uri(uri) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .body(bodyProducer, String.class);
-                return retrieve(traceTag, request);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .post() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .body(bodyProducer, String.class);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
         return postForEntity(uri, body) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.post() //
-                    .uri(uri) //
-                    .headers(headers -> headers.setBasicAuth(username, password)) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .bodyValue(body);
-                return retrieve(traceTag, request) //
-                    .flatMap(this::toBody);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .post() //
+            .uri(uri) //
+            .headers(headers -> headers.setBasicAuth(username, password)) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .bodyValue(body);
+        return retrieve(traceTag, request) //
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: {}", traceTag, body);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.put() //
-                    .uri(uri) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .bodyValue(body);
-                return retrieve(traceTag, request);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .put() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .bodyValue(body);
+        return retrieve(traceTag, request);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: <empty>", traceTag);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.put() //
-                    .uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient() //
+            .put() //
+            .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> put(String uri, String body) {
         return putForEntity(uri, body) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.get().uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> get(String uri) {
         return getForEntity(uri) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.delete().uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> delete(String uri) {
         return deleteForEntity(uri) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -188,66 +170,58 @@ public class AsyncRestClient {
             logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
                 exception.getResponseBodyAsString());
         } else {
-            logger.debug("{} HTTP error", traceTag, t);
+            logger.debug("{} HTTP error {}", traceTag, t.getMessage());
         }
     }
 
-    private Mono<String> toBody(ResponseEntity<String> entity) {
+    private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
-            return Mono.just("");
+            return "";
         } else {
-            return Mono.just(entity.getBody());
+            return entity.getBody();
         }
     }
 
-    private TcpClient createTcpClientSecure(SslContext sslContext) {
-        return TcpClient.create(ConnectionProvider.newConnection()) //
-            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
-            .secure(c -> c.sslContext(sslContext)) //
-            .doOnConnected(connection -> {
-                connection.addHandlerLast(new ReadTimeoutHandler(30));
-                connection.addHandlerLast(new WriteTimeoutHandler(30));
-            });
+    private boolean isHttpProxyConfigured() {
+        return httpProxyConfig != null && httpProxyConfig.httpProxyPort() > 0
+            && !httpProxyConfig.httpProxyHost().isEmpty();
     }
 
-    private TcpClient createTcpClientInsecure() {
-        return TcpClient.create(ConnectionProvider.newConnection()) //
+    private HttpClient buildHttpClient() {
+        HttpClient httpClient = HttpClient.create() //
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
             .doOnConnected(connection -> {
                 connection.addHandlerLast(new ReadTimeoutHandler(30));
                 connection.addHandlerLast(new WriteTimeoutHandler(30));
             });
+
+        if (this.sslContext != null) {
+            httpClient = httpClient.secure(ssl -> ssl.sslContext(sslContext));
+        }
+
+        if (isHttpProxyConfigured()) {
+            httpClient = httpClient.proxy(proxy -> proxy.type(ProxyProvider.Proxy.HTTP)
+                .host(httpProxyConfig.httpProxyHost()).port(httpProxyConfig.httpProxyPort()));
+        }
+        return httpClient;
     }
 
-    private WebClient createWebClient(String baseUrl, TcpClient tcpClient) {
-        HttpClient httpClient = HttpClient.from(tcpClient);
-        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+    private WebClient buildWebClient(String baseUrl) {
+        final HttpClient httpClient = buildHttpClient();
         ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
             .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
             .build();
         return WebClient.builder() //
-            .clientConnector(connector) //
+            .clientConnector(new ReactorClientHttpConnector(httpClient)) //
             .baseUrl(baseUrl) //
             .exchangeStrategies(exchangeStrategies) //
             .build();
     }
 
-    private Mono<WebClient> getWebClient() {
+    private WebClient getWebClient() {
         if (this.webClient == null) {
-            try {
-                if (this.sslContext != null) {
-                    TcpClient tcpClient = createTcpClientSecure(sslContext);
-                    this.webClient = createWebClient(this.baseUrl, tcpClient);
-                } else {
-                    TcpClient tcpClient = createTcpClientInsecure();
-                    this.webClient = createWebClient(this.baseUrl, tcpClient);
-                }
-            } catch (Exception e) {
-                logger.error("Could not create WebClient {}", e.getMessage());
-                return Mono.error(e);
-            }
+            this.webClient = buildWebClient(baseUrl);
         }
-        return Mono.just(this.webClient);
+        return this.webClient;
     }
-
 }