7c1ae929d3a5bd332b98097a9f7bed20f6130fe2
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / http / DfcHttpClient.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2020-2021 Nokia. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16 package org.oran.datafile.http;
17
18 import java.io.IOException;
19 import java.io.InputStream;
20 import java.nio.file.Files;
21 import java.nio.file.Path;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.atomic.AtomicReference;
24 import java.util.function.Consumer;
25
26 import org.oran.datafile.commons.FileCollectClient;
27 import org.oran.datafile.exceptions.DatafileTaskException;
28 import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
29 import org.oran.datafile.model.FileServerData;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import reactor.core.Disposable;
34 import reactor.core.publisher.Flux;
35 import reactor.core.publisher.Mono;
36 import reactor.netty.http.client.HttpClient;
37 import reactor.netty.http.client.HttpClientResponse;
38 import reactor.netty.resources.ConnectionProvider;
39
40 /**
41  * Gets file from PNF with HTTP protocol.
42  *
43  */
44 public class DfcHttpClient implements FileCollectClient {
45
46     // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
47     private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
48     private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
49     private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
50
51     private final FileServerData fileServerData;
52     private Disposable disposableClient;
53
54     protected HttpClient client;
55
56     public DfcHttpClient(FileServerData fileServerData) {
57         this.fileServerData = fileServerData;
58     }
59
60     @Override
61     public void open() throws DatafileTaskException {
62         logger.trace("Setting httpClient for file download.");
63
64         String authorizationContent = getAuthorizationContent();
65         this.client =
66             HttpClient.create(pool).keepAlive(true).headers(h -> h.add("Authorization", authorizationContent));
67
68         logger.trace("httpClient, auth header was set.");
69     }
70
71     protected String getAuthorizationContent() throws DatafileTaskException {
72         String jwtToken = HttpUtils.getJWTToken(fileServerData);
73         if (!jwtToken.isEmpty()) {
74             return HttpUtils.jwtAuthContent(jwtToken);
75         }
76         if (!HttpUtils.isBasicAuthDataFilled(fileServerData)) {
77             throw new DatafileTaskException("Not sufficient basic auth data for file.");
78         }
79         return HttpUtils.basicAuthContent(this.fileServerData.userId, this.fileServerData.password);
80     }
81
82     @Override
83     public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
84         logger.trace("Prepare to collectFile {}", localFile);
85         CountDownLatch latch = new CountDownLatch(1);
86         final AtomicReference<Exception> errorMessage = new AtomicReference<>();
87
88         Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
89         Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
90
91         Flux<InputStream> responseContent = getServerResponse(remoteFile);
92         disposableClient = responseContent.subscribe(onSuccess, onError);
93
94         try {
95             latch.await();
96         } catch (InterruptedException e) {
97             throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
98         }
99
100         if (isDownloadFailed(errorMessage)) {
101             if (errorMessage.get() instanceof NonRetryableDatafileTaskException) {
102                 throw (NonRetryableDatafileTaskException) errorMessage.get();
103             }
104             throw (DatafileTaskException) errorMessage.get();
105         }
106
107         logger.trace("HTTP collectFile OK");
108     }
109
110     protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
111         return (errorMessage.get() != null);
112     }
113
114     protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch,
115         AtomicReference<Exception> errorMessages) {
116         return (Throwable response) -> {
117             Exception e = new Exception("Error in connection has occurred during file download", response);
118             errorMessages.set(new DatafileTaskException(response.getMessage(), e));
119             if (response instanceof NonRetryableDatafileTaskException) {
120                 errorMessages.set(new NonRetryableDatafileTaskException(response.getMessage(), e));
121             }
122             latch.countDown();
123         };
124     }
125
126     protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
127         AtomicReference<Exception> errorMessages) {
128         return (InputStream response) -> {
129             logger.trace("Starting to process response.");
130             try {
131                 long numBytes = Files.copy(response, localFile);
132                 logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
133                 logger.trace("CollectFile fetched: {}", localFile);
134                 response.close();
135             } catch (IOException e) {
136                 errorMessages.set(new DatafileTaskException("Error fetching file with", e));
137             } finally {
138                 latch.countDown();
139             }
140         };
141     }
142
143     protected Flux<InputStream> getServerResponse(String remoteFile) {
144         return client.get().uri(HttpUtils.prepareHttpUri(fileServerData, remoteFile))
145             .response((responseReceiver, byteBufFlux) -> {
146                 logger.trace("HTTP response status - {}", responseReceiver.status());
147                 if (isResponseOk(responseReceiver)) {
148                     return byteBufFlux.aggregate().asInputStream();
149                 }
150                 if (isErrorInConnection(responseReceiver)) {
151                     return Mono.error(new NonRetryableDatafileTaskException(
152                         HttpUtils.nonRetryableResponse(getResponseCode(responseReceiver))));
153                 }
154                 return Mono
155                     .error(new DatafileTaskException(HttpUtils.retryableResponse(getResponseCode(responseReceiver))));
156             });
157     }
158
159     protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
160         return getResponseCode(httpClientResponse) == 200;
161     }
162
163     private int getResponseCode(HttpClientResponse responseReceiver) {
164         return responseReceiver.status().code();
165     }
166
167     protected boolean isErrorInConnection(HttpClientResponse httpClientResponse) {
168         return getResponseCode(httpClientResponse) >= 400;
169     }
170
171     @Override
172     public void close() {
173         logger.trace("Starting http client disposal.");
174         disposableClient.dispose();
175         logger.trace("Http client disposed.");
176     }
177 }