Merge "Storage of PM Data in influx"
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / onap / dcaegen2 / collectors / datafile / http / DfcHttpClient.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2020-2021 Nokia. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16 package org.onap.dcaegen2.collectors.datafile.http;
17
18 import java.io.IOException;
19 import java.io.InputStream;
20 import java.nio.file.Files;
21 import java.nio.file.Path;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.atomic.AtomicReference;
24 import java.util.function.Consumer;
25
26 import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
27 import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
28 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
29 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
30 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import reactor.core.Disposable;
35 import reactor.core.publisher.Flux;
36 import reactor.core.publisher.Mono;
37 import reactor.netty.http.client.HttpClient;
38 import reactor.netty.http.client.HttpClientResponse;
39 import reactor.netty.resources.ConnectionProvider;
40
41 /**
42  * Gets file from PNF with HTTP protocol.
43  *
44  * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
45  */
46 public class DfcHttpClient implements FileCollectClient {
47
48     // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
49     private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
50     private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
51     private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
52
53     private final FileServerData fileServerData;
54     private Disposable disposableClient;
55
56     protected HttpClient client;
57
58     public DfcHttpClient(FileServerData fileServerData) {
59         this.fileServerData = fileServerData;
60     }
61
62     @Override
63     public void open() throws DatafileTaskException {
64         logger.trace("Setting httpClient for file download.");
65
66         String authorizationContent = getAuthorizationContent();
67         this.client =
68             HttpClient.create(pool).keepAlive(true).headers(h -> h.add("Authorization", authorizationContent));
69
70         logger.trace("httpClient, auth header was set.");
71     }
72
73     protected String getAuthorizationContent() throws DatafileTaskException {
74         String jwtToken = HttpUtils.getJWTToken(fileServerData);
75         if (!jwtToken.isEmpty()) {
76             return HttpUtils.jwtAuthContent(jwtToken);
77         }
78         if (!HttpUtils.isBasicAuthDataFilled(fileServerData)) {
79             throw new DatafileTaskException("Not sufficient basic auth data for file.");
80         }
81         return HttpUtils.basicAuthContent(this.fileServerData.userId, this.fileServerData.password);
82     }
83
84     @Override
85     public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
86         logger.trace("Prepare to collectFile {}", localFile);
87         CountDownLatch latch = new CountDownLatch(1);
88         final AtomicReference<Exception> errorMessage = new AtomicReference<>();
89
90         Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
91         Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
92
93         Flux<InputStream> responseContent = getServerResponse(remoteFile);
94         disposableClient = responseContent.subscribe(onSuccess, onError);
95
96         try {
97             latch.await();
98         } catch (InterruptedException e) {
99             throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
100         }
101
102         if (isDownloadFailed(errorMessage)) {
103             if (errorMessage.get() instanceof NonRetryableDatafileTaskException) {
104                 throw (NonRetryableDatafileTaskException) errorMessage.get();
105             }
106             throw (DatafileTaskException) errorMessage.get();
107         }
108
109         logger.trace("HTTP collectFile OK");
110     }
111
112     protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
113         return (errorMessage.get() != null);
114     }
115
116     protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch,
117         AtomicReference<Exception> errorMessages) {
118         return (Throwable response) -> {
119             Exception e = new Exception("Error in connection has occurred during file download", response);
120             errorMessages.set(new DatafileTaskException(response.getMessage(), e));
121             if (response instanceof NonRetryableDatafileTaskException) {
122                 errorMessages.set(new NonRetryableDatafileTaskException(response.getMessage(), e));
123             }
124             latch.countDown();
125         };
126     }
127
128     protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
129         AtomicReference<Exception> errorMessages) {
130         return (InputStream response) -> {
131             logger.trace("Starting to process response.");
132             try {
133                 long numBytes = Files.copy(response, localFile);
134                 logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
135                 logger.trace("CollectFile fetched: {}", localFile);
136                 response.close();
137             } catch (IOException e) {
138                 errorMessages.set(new DatafileTaskException("Error fetching file with", e));
139             } finally {
140                 latch.countDown();
141             }
142         };
143     }
144
145     protected Flux<InputStream> getServerResponse(String remoteFile) {
146         return client.get().uri(HttpUtils.prepareHttpUri(fileServerData, remoteFile))
147             .response((responseReceiver, byteBufFlux) -> {
148                 logger.trace("HTTP response status - {}", responseReceiver.status());
149                 if (isResponseOk(responseReceiver)) {
150                     return byteBufFlux.aggregate().asInputStream();
151                 }
152                 if (isErrorInConnection(responseReceiver)) {
153                     return Mono.error(new NonRetryableDatafileTaskException(
154                         HttpUtils.nonRetryableResponse(getResponseCode(responseReceiver))));
155                 }
156                 return Mono
157                     .error(new DatafileTaskException(HttpUtils.retryableResponse(getResponseCode(responseReceiver))));
158             });
159     }
160
161     protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
162         return getResponseCode(httpClientResponse) == 200;
163     }
164
165     private int getResponseCode(HttpClientResponse responseReceiver) {
166         return responseReceiver.status().code();
167     }
168
169     protected boolean isErrorInConnection(HttpClientResponse httpClientResponse) {
170         return getResponseCode(httpClientResponse) >= 400;
171     }
172
173     @Override
174     public void close() {
175         logger.trace("Starting http client disposal.");
176         disposableClient.dispose();
177         logger.trace("Http client disposed.");
178     }
179 }