2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
4 * Copyright (C) 2020-2022 Nokia. All rights reserved.
5 * ===============================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
7 * in compliance with the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software distributed under the License
12 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
13 * or implied. See the License for the specific language governing permissions and limitations under
15 * ============LICENSE_END========================================================================
18 package org.oran.datafile.tasks;
20 import java.nio.file.Files;
21 import java.nio.file.Path;
22 import java.nio.file.Paths;
23 import java.time.Duration;
24 import java.util.Optional;
26 import org.oran.datafile.commons.FileCollectClient;
27 import org.oran.datafile.commons.Scheme;
28 import org.oran.datafile.configuration.AppConfig;
29 import org.oran.datafile.configuration.CertificateConfig;
30 import org.oran.datafile.exceptions.DatafileTaskException;
31 import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
32 import org.oran.datafile.ftp.FtpesClient;
33 import org.oran.datafile.ftp.SftpClient;
34 import org.oran.datafile.ftp.SftpClientSettings;
35 import org.oran.datafile.http.DfcHttpClient;
36 import org.oran.datafile.http.DfcHttpsClient;
37 import org.oran.datafile.http.HttpsClientConnectionManagerUtil;
38 import org.oran.datafile.model.Counters;
39 import org.oran.datafile.model.FileData;
40 import org.oran.datafile.model.FilePublishInformation;
41 import org.oran.datafile.model.FileReadyMessage;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import reactor.core.publisher.Mono;
46 import reactor.util.retry.Retry;
49 * Collects a file from a PNF.
51 public class FileCollector {
53 private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
54 private final AppConfig appConfig;
55 private final Counters counters;
60 * @param appConfig application configuration
62 public FileCollector(AppConfig appConfig, Counters counters) {
63 this.appConfig = appConfig;
64 this.counters = counters;
68 * Collects a file from the PNF and stores it in the local file system.
70 * @param fileData data about the file to collect.
71 * @param numRetries the number of retries if the publishing fails
72 * @param firstBackoff the time to delay the first retry
73 * @param contextMap context for logging.
74 * @return the data needed to publish the file.
76 public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
78 logger.trace("Entering collectFile with {}", fileData);
80 return Mono.just(fileData) //
82 .flatMap(fd -> tryCollectFile(fileData)) //
83 .retryWhen(Retry.backoff(numRetries, firstBackoff)) //
84 .flatMap(FileCollector::checkCollectedFile);
87 private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
88 if (info.isPresent()) {
89 return Mono.just(info.get());
91 // If there is no info, the file is not retrievable
92 return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
96 private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData) {
97 logger.trace("starting to collectFile {}", fileData.fileInfo.name);
99 final String remoteFile = fileData.remoteFilePath();
100 final Path localFile = fileData.getLocalFilePath(this.appConfig);
102 try (FileCollectClient currentClient = createClient(fileData)) {
103 currentClient.open();
104 Files.createDirectories(localFile.getParent());
105 currentClient.collectFile(remoteFile, localFile);
106 counters.incNoOfCollectedFiles();
107 return Mono.just(Optional.of(createFilePublishInformation(fileData)));
108 } catch (NonRetryableDatafileTaskException nre) {
109 logger.warn("Failed to download file, not retryable: {} {}, reason: {}", fileData.sourceName(),
110 fileData.fileInfo.name, nre.getMessage());
111 incFailedAttemptsCounter(fileData);
112 return Mono.just(Optional.empty()); // Give up
113 } catch (DatafileTaskException e) {
114 logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
116 incFailedAttemptsCounter(fileData);
117 return Mono.error(e);
118 } catch (Exception throwable) {
119 logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
120 throwable.getMessage(), throwable);
121 return Mono.just(Optional.of(createFilePublishInformation(fileData)));
125 private void incFailedAttemptsCounter(FileData fileData) {
126 if (Scheme.isFtpScheme(fileData.scheme())) {
127 counters.incNoOfFailedFtpAttempts();
129 counters.incNoOfFailedHttpAttempts();
133 private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
134 switch (fileData.scheme()) {
136 return createSftpClient(fileData);
138 return createFtpesClient(fileData);
140 return createHttpClient(fileData);
142 return createHttpsClient(fileData);
144 throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
148 public FilePublishInformation createFilePublishInformation(FileData fileData) {
149 FileReadyMessage.MessageMetaData metaData = fileData.messageMetaData;
150 return FilePublishInformation.builder() //
151 .productName(metaData.productName()) //
152 .vendorName(metaData.vendorName()) //
153 .lastEpochMicrosec(metaData.lastEpochMicrosec) //
154 .sourceName(metaData.sourceName) //
155 .startEpochMicrosec(metaData.startEpochMicrosec) //
156 .timeZoneOffset(metaData.timeZoneOffset) //
157 .name(metaData.sourceName + "/" + fileData.fileInfo.name) //
158 .compression(fileData.fileInfo.hashMap.compression) //
159 .fileFormatType(fileData.fileInfo.hashMap.fileFormatType) //
160 .fileFormatVersion(fileData.fileInfo.hashMap.fileFormatVersion) //
161 .changeIdentifier(fileData.messageMetaData.changeIdentifier) //
162 .objectStoreBucket(this.appConfig.isS3Enabled() ? this.appConfig.getS3Bucket() : null) //
166 protected SftpClient createSftpClient(FileData fileData) {
167 return new SftpClient(fileData.fileServerData(), new SftpClientSettings(appConfig.getSftpConfiguration()));
170 protected FtpesClient createFtpesClient(FileData fileData) throws DatafileTaskException {
171 CertificateConfig config = appConfig.getCertificateConfiguration();
172 Path trustedCa = config.trustedCa.isEmpty() ? null : Paths.get(config.trustedCa);
174 return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert), config.keyPasswordPath, trustedCa,
175 config.trustedCaPasswordPath);
178 protected FileCollectClient createHttpClient(FileData fileData) {
179 return new DfcHttpClient(fileData.fileServerData());
182 protected FileCollectClient createHttpsClient(FileData fileData) throws DatafileTaskException {
183 return new DfcHttpsClient(fileData.fileServerData(), HttpsClientConnectionManagerUtil.instance());