2b3ce53a535acafa70140619cfa96652501201da
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / onap / dcaegen2 / collectors / datafile / tasks / FileCollector.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
4  * Copyright (C) 2020-2022 Nokia. All rights reserved.
5  * ===============================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
7  * in compliance with the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software distributed under the License
12  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
13  * or implied. See the License for the specific language governing permissions and limitations under
14  * the License.
15  * ============LICENSE_END========================================================================
16  */
17
18 package org.onap.dcaegen2.collectors.datafile.tasks;
19
20 import java.nio.file.Files;
21 import java.nio.file.Path;
22 import java.nio.file.Paths;
23 import java.time.Duration;
24 import java.util.Optional;
25
26 import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
27 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
28 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
29 import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
30 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
31 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
32 import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
33 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
34 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
35 import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
36 import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
37 import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
38 import org.onap.dcaegen2.collectors.datafile.model.Counters;
39 import org.onap.dcaegen2.collectors.datafile.model.FileData;
40 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
41 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import reactor.core.publisher.Mono;
46 import reactor.util.retry.Retry;
47
48 /**
49  * Collects a file from a PNF.
50  *
51  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
52  */
53 public class FileCollector {
54
55     private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
56     private final AppConfig appConfig;
57     private final Counters counters;
58
59     /**
60      * Constructor.
61      *
62      * @param appConfig application configuration
63      */
64     public FileCollector(AppConfig appConfig, Counters counters) {
65         this.appConfig = appConfig;
66         this.counters = counters;
67     }
68
69     /**
70      * Collects a file from the PNF and stores it in the local file system.
71      *
72      * @param fileData data about the file to collect.
73      * @param numRetries the number of retries if the publishing fails
74      * @param firstBackoff the time to delay the first retry
75      * @param contextMap context for logging.
76      * @return the data needed to publish the file.
77      */
78     public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
79
80         logger.trace("Entering collectFile with {}", fileData);
81
82         return Mono.just(fileData) //
83             .cache() //
84             .flatMap(fd -> tryCollectFile(fileData)) //
85             .retryWhen(Retry.backoff(numRetries, firstBackoff)) //
86             .flatMap(FileCollector::checkCollectedFile);
87     }
88
89     private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
90         if (info.isPresent()) {
91             return Mono.just(info.get());
92         } else {
93             // If there is no info, the file is not retrievable
94             return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
95         }
96     }
97
98     private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData) {
99         logger.trace("starting to collectFile {}", fileData.fileInfo.name);
100
101         final String remoteFile = fileData.remoteFilePath();
102         final Path localFile = fileData.getLocalFilePath(this.appConfig);
103
104         try (FileCollectClient currentClient = createClient(fileData)) {
105             currentClient.open();
106             Files.createDirectories(localFile.getParent());
107             currentClient.collectFile(remoteFile, localFile);
108             counters.incNoOfCollectedFiles();
109             return Mono.just(Optional.of(createFilePublishInformation(fileData)));
110         } catch (NonRetryableDatafileTaskException nre) {
111             logger.warn("Failed to download file, not retryable: {} {}, reason: {}", fileData.sourceName(),
112                 fileData.fileInfo.name, nre.getMessage());
113             incFailedAttemptsCounter(fileData);
114             return Mono.just(Optional.empty()); // Give up
115         } catch (DatafileTaskException e) {
116             logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
117                 e.getMessage());
118             incFailedAttemptsCounter(fileData);
119             return Mono.error(e);
120         } catch (Exception throwable) {
121             logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
122                 throwable.getMessage(), throwable);
123             return Mono.just(Optional.of(createFilePublishInformation(fileData)));
124         }
125     }
126
127     private void incFailedAttemptsCounter(FileData fileData) {
128         if (Scheme.isFtpScheme(fileData.scheme())) {
129             counters.incNoOfFailedFtpAttempts();
130         } else {
131             counters.incNoOfFailedHttpAttempts();
132         }
133     }
134
135     private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
136         switch (fileData.scheme()) {
137             case SFTP:
138                 return createSftpClient(fileData);
139             case FTPES:
140                 return createFtpesClient(fileData);
141             case HTTP:
142                 return createHttpClient(fileData);
143             case HTTPS:
144                 return createHttpsClient(fileData);
145             default:
146                 throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
147         }
148     }
149
150     public FilePublishInformation createFilePublishInformation(FileData fileData) {
151         FileReadyMessage.MessageMetaData metaData = fileData.messageMetaData;
152         return FilePublishInformation.builder() //
153             .productName(metaData.productName()) //
154             .vendorName(metaData.vendorName()) //
155             .lastEpochMicrosec(metaData.lastEpochMicrosec) //
156             .sourceName(metaData.sourceName) //
157             .startEpochMicrosec(metaData.startEpochMicrosec) //
158             .timeZoneOffset(metaData.timeZoneOffset) //
159             .name(metaData.sourceName + "/" + fileData.fileInfo.name) //
160             .compression(fileData.fileInfo.hashMap.compression) //
161             .fileFormatType(fileData.fileInfo.hashMap.fileFormatType) //
162             .fileFormatVersion(fileData.fileInfo.hashMap.fileFormatVersion) //
163             .changeIdentifier(fileData.messageMetaData.changeIdentifier) //
164             .objectStoreBucket(this.appConfig.isS3Enabled() ? this.appConfig.getS3Bucket() : null) //
165             .build();
166     }
167
168     protected SftpClient createSftpClient(FileData fileData) {
169         return new SftpClient(fileData.fileServerData(), new SftpClientSettings(appConfig.getSftpConfiguration()));
170     }
171
172     protected FtpesClient createFtpesClient(FileData fileData) throws DatafileTaskException {
173         CertificateConfig config = appConfig.getCertificateConfiguration();
174         Path trustedCa = config.trustedCa.isEmpty() ? null : Paths.get(config.trustedCa);
175
176         return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert), config.keyPasswordPath, trustedCa,
177             config.trustedCaPasswordPath);
178     }
179
180     protected FileCollectClient createHttpClient(FileData fileData) {
181         return new DfcHttpClient(fileData.fileServerData());
182     }
183
184     protected FileCollectClient createHttpsClient(FileData fileData) throws DatafileTaskException {
185         return new DfcHttpsClient(fileData.fileServerData(), HttpsClientConnectionManagerUtil.instance());
186     }
187 }