Minor changes
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / clients / AsyncRestClient.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.clients;
22
23 import io.netty.channel.ChannelOption;
24 import io.netty.handler.ssl.SslContext;
25 import io.netty.handler.ssl.SslContextBuilder;
26 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
27 import io.netty.handler.timeout.ReadTimeoutHandler;
28 import io.netty.handler.timeout.WriteTimeoutHandler;
29
30 import java.lang.invoke.MethodHandles;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import javax.net.ssl.SSLException;
34
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.springframework.http.MediaType;
38 import org.springframework.http.ResponseEntity;
39 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
40 import org.springframework.lang.Nullable;
41 import org.springframework.web.reactive.function.client.WebClient;
42 import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
43 import org.springframework.web.reactive.function.client.WebClientResponseException;
44
45 import reactor.core.publisher.Mono;
46 import reactor.netty.http.client.HttpClient;
47 import reactor.netty.tcp.TcpClient;
48
49 /**
50  * Generic reactive REST client.
51  */
52 public class AsyncRestClient {
53     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
54     private WebClient webClient = null;
55     private final String baseUrl;
56     private static final AtomicInteger sequenceNumber = new AtomicInteger();
57
58     public AsyncRestClient(String baseUrl) {
59         this.baseUrl = baseUrl;
60     }
61
62     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
63         Object traceTag = createTraceTag();
64         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
65         logger.trace("{} POST body: {}", traceTag, body);
66         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
67         return getWebClient() //
68             .flatMap(client -> {
69                 RequestHeadersSpec<?> request = client.post() //
70                     .uri(uri) //
71                     .contentType(MediaType.APPLICATION_JSON) //
72                     .body(bodyProducer, String.class);
73                 return retrieve(traceTag, request);
74             });
75     }
76
77     public Mono<String> post(String uri, @Nullable String body) {
78         return postForEntity(uri, body) //
79             .flatMap(this::toBody);
80     }
81
82     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
83         Object traceTag = createTraceTag();
84         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
85         logger.trace("{} POST body: {}", traceTag, body);
86         return getWebClient() //
87             .flatMap(client -> {
88                 RequestHeadersSpec<?> request = client.post() //
89                     .uri(uri) //
90                     .headers(headers -> headers.setBasicAuth(username, password)) //
91                     .contentType(MediaType.APPLICATION_JSON) //
92                     .bodyValue(body);
93                 return retrieve(traceTag, request) //
94                     .flatMap(this::toBody);
95             });
96     }
97
98     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
99         Object traceTag = createTraceTag();
100         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
101         logger.trace("{} PUT body: {}", traceTag, body);
102         return getWebClient() //
103             .flatMap(client -> {
104                 RequestHeadersSpec<?> request = client.put() //
105                     .uri(uri) //
106                     .contentType(MediaType.APPLICATION_JSON) //
107                     .bodyValue(body);
108                 return retrieve(traceTag, request);
109             });
110     }
111
112     public Mono<ResponseEntity<String>> putForEntity(String uri) {
113         Object traceTag = createTraceTag();
114         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
115         logger.trace("{} PUT body: <empty>", traceTag);
116         return getWebClient() //
117             .flatMap(client -> {
118                 RequestHeadersSpec<?> request = client.put() //
119                     .uri(uri);
120                 return retrieve(traceTag, request);
121             });
122     }
123
124     public Mono<String> put(String uri, String body) {
125         return putForEntity(uri, body) //
126             .flatMap(this::toBody);
127     }
128
129     public Mono<ResponseEntity<String>> getForEntity(String uri) {
130         Object traceTag = createTraceTag();
131         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
132         return getWebClient() //
133             .flatMap(client -> {
134                 RequestHeadersSpec<?> request = client.get().uri(uri);
135                 return retrieve(traceTag, request);
136             });
137     }
138
139     public Mono<String> get(String uri) {
140         return getForEntity(uri) //
141             .flatMap(this::toBody);
142     }
143
144     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
145         Object traceTag = createTraceTag();
146         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
147         return getWebClient() //
148             .flatMap(client -> {
149                 RequestHeadersSpec<?> request = client.delete().uri(uri);
150                 return retrieve(traceTag, request);
151             });
152     }
153
154     public Mono<String> delete(String uri) {
155         return deleteForEntity(uri) //
156             .flatMap(this::toBody);
157     }
158
159     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
160         return request.retrieve() //
161             .toEntity(String.class) //
162             .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody()))
163             .doOnError(throwable -> onHttpError(traceTag, throwable));
164     }
165
166     private static Object createTraceTag() {
167         return sequenceNumber.incrementAndGet();
168     }
169
170     private void onHttpError(Object traceTag, Throwable t) {
171         if (t instanceof WebClientResponseException) {
172             WebClientResponseException exception = (WebClientResponseException) t;
173             logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
174                 exception.getResponseBodyAsString());
175         } else {
176             logger.debug("{} HTTP error: {}", traceTag, t.getMessage());
177         }
178     }
179
180     private Mono<String> toBody(ResponseEntity<String> entity) {
181         if (entity.getBody() == null) {
182             return Mono.just("");
183         } else {
184             return Mono.just(entity.getBody());
185         }
186     }
187
188     private static SslContext createSslContext() throws SSLException {
189         return SslContextBuilder.forClient() //
190             .trustManager(InsecureTrustManagerFactory.INSTANCE) //
191             .build();
192     }
193
194     private static WebClient createWebClient(String baseUrl, SslContext sslContext) {
195         TcpClient tcpClient = TcpClient.create() //
196             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
197             .secure(c -> c.sslContext(sslContext)) //
198             .doOnConnected(connection -> {
199                 connection.addHandler(new ReadTimeoutHandler(30));
200                 connection.addHandler(new WriteTimeoutHandler(30));
201             });
202         HttpClient httpClient = HttpClient.from(tcpClient);
203         ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
204
205         return WebClient.builder() //
206             .clientConnector(connector) //
207             .baseUrl(baseUrl) //
208             .build();
209     }
210
211     private Mono<WebClient> getWebClient() {
212         if (this.webClient == null) {
213             try {
214                 SslContext sslContext = createSslContext();
215                 this.webClient = createWebClient(this.baseUrl, sslContext);
216             } catch (SSLException e) {
217                 logger.error("Could not create WebClient {}", e.getMessage());
218                 return Mono.error(e);
219             }
220         }
221         return Mono.just(this.webClient);
222     }
223
224 }