/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2022 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
/**
* Collects a file from a PNF.
*
* @author Henrik Andersson
*/
public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig appConfig;
private final Counters counters;
/**
* Constructor.
*
* @param appConfig application configuration
*/
public FileCollector(AppConfig appConfig, Counters counters) {
this.appConfig = appConfig;
this.counters = counters;
}
/**
* Collects a file from the PNF and stores it in the local file system.
*
* @param fileData data about the file to collect.
* @param numRetries the number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
* @param contextMap context for logging.
* @return the data needed to publish the file.
*/
public Mono collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
logger.trace("Entering collectFile with {}", fileData);
return Mono.just(fileData) //
.cache() //
.flatMap(fd -> tryCollectFile(fileData)) //
.retryWhen(Retry.backoff(numRetries, firstBackoff)) //
.flatMap(FileCollector::checkCollectedFile);
}
private static Mono checkCollectedFile(Optional info) {
if (info.isPresent()) {
return Mono.just(info.get());
} else {
// If there is no info, the file is not retrievable
return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
}
}
private Mono> tryCollectFile(FileData fileData) {
logger.trace("starting to collectFile {}", fileData.fileInfo.name);
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFilePath(this.appConfig);
try (FileCollectClient currentClient = createClient(fileData)) {
currentClient.open();
FileUtils.forceMkdirParent(localFile.toFile());
currentClient.collectFile(remoteFile, localFile);
counters.incNoOfCollectedFiles();
return Mono.just(Optional.of(createFilePublishInformation(fileData)));
} catch (NonRetryableDatafileTaskException nre) {
logger.warn("Failed to download file, not retryable: {} {}, reason: {}", fileData.sourceName(),
fileData.fileInfo.name, nre.getMessage());
incFailedAttemptsCounter(fileData);
return Mono.just(Optional.empty()); // Give up
} catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
e.getMessage());
incFailedAttemptsCounter(fileData);
return Mono.error(e);
} catch (Exception throwable) {
logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
throwable.getMessage(), throwable);
return Mono.just(Optional.of(createFilePublishInformation(fileData)));
}
}
private void incFailedAttemptsCounter(FileData fileData) {
if (Scheme.isFtpScheme(fileData.scheme())) {
counters.incNoOfFailedFtpAttempts();
} else {
counters.incNoOfFailedHttpAttempts();
}
}
private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
switch (fileData.scheme()) {
case SFTP:
return createSftpClient(fileData);
case FTPES:
return createFtpesClient(fileData);
case HTTP:
return createHttpClient(fileData);
case HTTPS:
return createHttpsClient(fileData);
default:
throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
}
}
public FilePublishInformation createFilePublishInformation(FileData fileData) {
FileReadyMessage.MessageMetaData metaData = fileData.messageMetaData;
return FilePublishInformation.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
.lastEpochMicrosec(metaData.lastEpochMicrosec) //
.sourceName(metaData.sourceName) //
.startEpochMicrosec(metaData.startEpochMicrosec) //
.timeZoneOffset(metaData.timeZoneOffset) //
.name(metaData.sourceName + "/" + fileData.fileInfo.name) //
.compression(fileData.fileInfo.hashMap.compression) //
.fileFormatType(fileData.fileInfo.hashMap.fileFormatType) //
.fileFormatVersion(fileData.fileInfo.hashMap.fileFormatVersion) //
.changeIdentifier(fileData.messageMetaData.changeIdentifier) //
.objectStoreBucket(this.appConfig.isS3Enabled() ? this.appConfig.getS3Bucket() : null) //
.build();
}
protected SftpClient createSftpClient(FileData fileData) {
return new SftpClient(fileData.fileServerData(), new SftpClientSettings(appConfig.getSftpConfiguration()));
}
protected FtpesClient createFtpesClient(FileData fileData) throws DatafileTaskException {
CertificateConfig config = appConfig.getCertificateConfiguration();
Path trustedCa = config.trustedCa.isEmpty() ? null : Paths.get(config.trustedCa);
return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert), config.keyPasswordPath, trustedCa,
config.trustedCaPasswordPath);
}
protected FileCollectClient createHttpClient(FileData fileData) {
return new DfcHttpClient(fileData.fileServerData());
}
protected FileCollectClient createHttpsClient(FileData fileData) throws DatafileTaskException {
return new DfcHttpsClient(fileData.fileServerData(), HttpsClientConnectionManagerUtil.instance());
}
}