Merge "Added support for using oauth token for Kafka"
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / http / DfcHttpClient.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2020-2021 Nokia. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16 package org.oran.datafile.http;
17
18 import java.io.IOException;
19 import java.io.InputStream;
20 import java.nio.file.Files;
21 import java.nio.file.Path;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.atomic.AtomicReference;
24 import java.util.function.Consumer;
25
26 import org.oran.datafile.commons.FileCollectClient;
27 import org.oran.datafile.commons.FileServerData;
28 import org.oran.datafile.exceptions.DatafileTaskException;
29 import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
30 import org.oran.datafile.service.HttpUtils;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import reactor.core.Disposable;
35 import reactor.core.publisher.Flux;
36 import reactor.core.publisher.Mono;
37 import reactor.netty.http.client.HttpClient;
38 import reactor.netty.http.client.HttpClientResponse;
39 import reactor.netty.resources.ConnectionProvider;
40
41 /**
42  * Gets file from PNF with HTTP protocol.
43  *
44  */
45 public class DfcHttpClient implements FileCollectClient {
46
47     // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
48     private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
49     private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
50     private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
51
52     private final FileServerData fileServerData;
53     private Disposable disposableClient;
54
55     protected HttpClient client;
56
57     public DfcHttpClient(FileServerData fileServerData) {
58         this.fileServerData = fileServerData;
59     }
60
61     @Override
62     public void open() throws DatafileTaskException {
63         logger.trace("Setting httpClient for file download.");
64
65         String authorizationContent = getAuthorizationContent();
66         this.client =
67             HttpClient.create(pool).keepAlive(true).headers(h -> h.add("Authorization", authorizationContent));
68
69         logger.trace("httpClient, auth header was set.");
70     }
71
72     protected String getAuthorizationContent() throws DatafileTaskException {
73         String jwtToken = HttpUtils.getJWTToken(fileServerData);
74         if (!jwtToken.isEmpty()) {
75             return HttpUtils.jwtAuthContent(jwtToken);
76         }
77         if (!HttpUtils.isBasicAuthDataFilled(fileServerData)) {
78             throw new DatafileTaskException("Not sufficient basic auth data for file.");
79         }
80         return HttpUtils.basicAuthContent(this.fileServerData.userId, this.fileServerData.password);
81     }
82
83     @Override
84     public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
85         logger.trace("Prepare to collectFile {}", localFile);
86         CountDownLatch latch = new CountDownLatch(1);
87         final AtomicReference<Exception> errorMessage = new AtomicReference<>();
88
89         Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
90         Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
91
92         Flux<InputStream> responseContent = getServerResponse(remoteFile);
93         disposableClient = responseContent.subscribe(onSuccess, onError);
94
95         try {
96             latch.await();
97         } catch (InterruptedException e) {
98             throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
99         }
100
101         if (isDownloadFailed(errorMessage)) {
102             if (errorMessage.get() instanceof NonRetryableDatafileTaskException) {
103                 throw (NonRetryableDatafileTaskException) errorMessage.get();
104             }
105             throw (DatafileTaskException) errorMessage.get();
106         }
107
108         logger.trace("HTTP collectFile OK");
109     }
110
111     protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
112         return (errorMessage.get() != null);
113     }
114
115     protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch,
116         AtomicReference<Exception> errorMessages) {
117         return (Throwable response) -> {
118             Exception e = new Exception("Error in connection has occurred during file download", response);
119             errorMessages.set(new DatafileTaskException(response.getMessage(), e));
120             if (response instanceof NonRetryableDatafileTaskException) {
121                 errorMessages.set(new NonRetryableDatafileTaskException(response.getMessage(), e));
122             }
123             latch.countDown();
124         };
125     }
126
127     protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
128         AtomicReference<Exception> errorMessages) {
129         return (InputStream response) -> {
130             logger.trace("Starting to process response.");
131             try {
132                 long numBytes = Files.copy(response, localFile);
133                 logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
134                 logger.trace("CollectFile fetched: {}", localFile);
135                 response.close();
136             } catch (IOException e) {
137                 errorMessages.set(new DatafileTaskException("Error fetching file with", e));
138             } finally {
139                 latch.countDown();
140             }
141         };
142     }
143
144     protected Flux<InputStream> getServerResponse(String remoteFile) {
145         return client.get().uri(HttpUtils.prepareHttpUri(fileServerData, remoteFile))
146             .response((responseReceiver, byteBufFlux) -> {
147                 logger.trace("HTTP response status - {}", responseReceiver.status());
148                 if (isResponseOk(responseReceiver)) {
149                     return byteBufFlux.aggregate().asInputStream();
150                 }
151                 if (isErrorInConnection(responseReceiver)) {
152                     return Mono.error(new NonRetryableDatafileTaskException(
153                         HttpUtils.nonRetryableResponse(getResponseCode(responseReceiver))));
154                 }
155                 return Mono
156                     .error(new DatafileTaskException(HttpUtils.retryableResponse(getResponseCode(responseReceiver))));
157             });
158     }
159
160     protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
161         return getResponseCode(httpClientResponse) == 200;
162     }
163
164     private int getResponseCode(HttpClientResponse responseReceiver) {
165         return responseReceiver.status().code();
166     }
167
168     protected boolean isErrorInConnection(HttpClientResponse httpClientResponse) {
169         return getResponseCode(httpClientResponse) >= 400;
170     }
171
172     @Override
173     public void close() {
174         logger.trace("Starting http client disposal.");
175         disposableClient.dispose();
176         logger.trace("Http client disposed.");
177     }
178 }