+++ /dev/null
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * Copyright (C) 2020-2022 Nokia. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.Optional;
-
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
-import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
-import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
-import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-/**
- * Collects a file from a PNF.
- *
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class FileCollector {
-
- private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
- private final AppConfig appConfig;
- private final Counters counters;
-
- /**
- * Constructor.
- *
- * @param appConfig application configuration
- */
- public FileCollector(AppConfig appConfig, Counters counters) {
- this.appConfig = appConfig;
- this.counters = counters;
- }
-
- /**
- * Collects a file from the PNF and stores it in the local file system.
- *
- * @param fileData data about the file to collect.
- * @param numRetries the number of retries if the publishing fails
- * @param firstBackoff the time to delay the first retry
- * @param contextMap context for logging.
- * @return the data needed to publish the file.
- */
- public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
-
- logger.trace("Entering collectFile with {}", fileData);
-
- return Mono.just(fileData) //
- .cache() //
- .flatMap(fd -> tryCollectFile(fileData)) //
- .retryWhen(Retry.backoff(numRetries, firstBackoff)) //
- .flatMap(FileCollector::checkCollectedFile);
- }
-
- private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
- if (info.isPresent()) {
- return Mono.just(info.get());
- } else {
- // If there is no info, the file is not retrievable
- return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
- }
- }
-
- private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData) {
- logger.trace("starting to collectFile {}", fileData.fileInfo.name);
-
- final String remoteFile = fileData.remoteFilePath();
- final Path localFile = fileData.getLocalFilePath(this.appConfig);
-
- try (FileCollectClient currentClient = createClient(fileData)) {
- currentClient.open();
- Files.createDirectories(localFile.getParent());
- currentClient.collectFile(remoteFile, localFile);
- counters.incNoOfCollectedFiles();
- return Mono.just(Optional.of(createFilePublishInformation(fileData)));
- } catch (NonRetryableDatafileTaskException nre) {
- logger.warn("Failed to download file, not retryable: {} {}, reason: {}", fileData.sourceName(),
- fileData.fileInfo.name, nre.getMessage());
- incFailedAttemptsCounter(fileData);
- return Mono.just(Optional.empty()); // Give up
- } catch (DatafileTaskException e) {
- logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
- e.getMessage());
- incFailedAttemptsCounter(fileData);
- return Mono.error(e);
- } catch (Exception throwable) {
- logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
- throwable.getMessage(), throwable);
- return Mono.just(Optional.of(createFilePublishInformation(fileData)));
- }
- }
-
- private void incFailedAttemptsCounter(FileData fileData) {
- if (Scheme.isFtpScheme(fileData.scheme())) {
- counters.incNoOfFailedFtpAttempts();
- } else {
- counters.incNoOfFailedHttpAttempts();
- }
- }
-
- private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
- switch (fileData.scheme()) {
- case SFTP:
- return createSftpClient(fileData);
- case FTPES:
- return createFtpesClient(fileData);
- case HTTP:
- return createHttpClient(fileData);
- case HTTPS:
- return createHttpsClient(fileData);
- default:
- throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
- }
- }
-
- public FilePublishInformation createFilePublishInformation(FileData fileData) {
- FileReadyMessage.MessageMetaData metaData = fileData.messageMetaData;
- return FilePublishInformation.builder() //
- .productName(metaData.productName()) //
- .vendorName(metaData.vendorName()) //
- .lastEpochMicrosec(metaData.lastEpochMicrosec) //
- .sourceName(metaData.sourceName) //
- .startEpochMicrosec(metaData.startEpochMicrosec) //
- .timeZoneOffset(metaData.timeZoneOffset) //
- .name(metaData.sourceName + "/" + fileData.fileInfo.name) //
- .compression(fileData.fileInfo.hashMap.compression) //
- .fileFormatType(fileData.fileInfo.hashMap.fileFormatType) //
- .fileFormatVersion(fileData.fileInfo.hashMap.fileFormatVersion) //
- .changeIdentifier(fileData.messageMetaData.changeIdentifier) //
- .objectStoreBucket(this.appConfig.isS3Enabled() ? this.appConfig.getS3Bucket() : null) //
- .build();
- }
-
- protected SftpClient createSftpClient(FileData fileData) {
- return new SftpClient(fileData.fileServerData(), new SftpClientSettings(appConfig.getSftpConfiguration()));
- }
-
- protected FtpesClient createFtpesClient(FileData fileData) throws DatafileTaskException {
- CertificateConfig config = appConfig.getCertificateConfiguration();
- Path trustedCa = config.trustedCa.isEmpty() ? null : Paths.get(config.trustedCa);
-
- return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert), config.keyPasswordPath, trustedCa,
- config.trustedCaPasswordPath);
- }
-
- protected FileCollectClient createHttpClient(FileData fileData) {
- return new DfcHttpClient(fileData.fileServerData());
- }
-
- protected FileCollectClient createHttpsClient(FileData fileData) throws DatafileTaskException {
- return new DfcHttpsClient(fileData.fileServerData(), HttpsClientConnectionManagerUtil.instance());
- }
-}