X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=datafilecollector%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdcaegen2%2Fcollectors%2Fdatafile%2Ftasks%2FFileCollector.java;fp=datafilecollector%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdcaegen2%2Fcollectors%2Fdatafile%2Ftasks%2FFileCollector.java;h=0000000000000000000000000000000000000000;hb=f0af18429aec79a590835103fedd753ee5ea93a9;hp=2b3ce53a535acafa70140619cfa96652501201da;hpb=54c8fecebbb5e19010e56eddf3aba8e127e0abc3;p=nonrtric%2Fplt%2Franpm.git diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java deleted file mode 100644 index 2b3ce53..0000000 --- a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ /dev/null @@ -1,187 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * Copyright (C) 2020-2022 Nokia. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.Optional; - -import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient; -import org.onap.dcaegen2.collectors.datafile.commons.Scheme; -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings; -import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient; -import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient; -import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil; -import org.onap.dcaegen2.collectors.datafile.model.Counters; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; -import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -/** - * Collects a file from a PNF. - * - * @author Henrik Andersson - */ -public class FileCollector { - - private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); - private final AppConfig appConfig; - private final Counters counters; - - /** - * Constructor. - * - * @param appConfig application configuration - */ - public FileCollector(AppConfig appConfig, Counters counters) { - this.appConfig = appConfig; - this.counters = counters; - } - - /** - * Collects a file from the PNF and stores it in the local file system. - * - * @param fileData data about the file to collect. - * @param numRetries the number of retries if the publishing fails - * @param firstBackoff the time to delay the first retry - * @param contextMap context for logging. - * @return the data needed to publish the file. - */ - public Mono collectFile(FileData fileData, long numRetries, Duration firstBackoff) { - - logger.trace("Entering collectFile with {}", fileData); - - return Mono.just(fileData) // - .cache() // - .flatMap(fd -> tryCollectFile(fileData)) // - .retryWhen(Retry.backoff(numRetries, firstBackoff)) // - .flatMap(FileCollector::checkCollectedFile); - } - - private static Mono checkCollectedFile(Optional info) { - if (info.isPresent()) { - return Mono.just(info.get()); - } else { - // If there is no info, the file is not retrievable - return Mono.error(new DatafileTaskException("Non retryable file transfer failure")); - } - } - - private Mono> tryCollectFile(FileData fileData) { - logger.trace("starting to collectFile {}", fileData.fileInfo.name); - - final String remoteFile = fileData.remoteFilePath(); - final Path localFile = fileData.getLocalFilePath(this.appConfig); - - try (FileCollectClient currentClient = createClient(fileData)) { - currentClient.open(); - Files.createDirectories(localFile.getParent()); - currentClient.collectFile(remoteFile, localFile); - counters.incNoOfCollectedFiles(); - return Mono.just(Optional.of(createFilePublishInformation(fileData))); - } catch (NonRetryableDatafileTaskException nre) { - logger.warn("Failed to download file, not retryable: {} {}, reason: {}", fileData.sourceName(), - fileData.fileInfo.name, nre.getMessage()); - incFailedAttemptsCounter(fileData); - return Mono.just(Optional.empty()); // Give up - } catch (DatafileTaskException e) { - logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name, - e.getMessage()); - incFailedAttemptsCounter(fileData); - return Mono.error(e); - } catch (Exception throwable) { - logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name, - throwable.getMessage(), throwable); - return Mono.just(Optional.of(createFilePublishInformation(fileData))); - } - } - - private void incFailedAttemptsCounter(FileData fileData) { - if (Scheme.isFtpScheme(fileData.scheme())) { - counters.incNoOfFailedFtpAttempts(); - } else { - counters.incNoOfFailedHttpAttempts(); - } - } - - private FileCollectClient createClient(FileData fileData) throws DatafileTaskException { - switch (fileData.scheme()) { - case SFTP: - return createSftpClient(fileData); - case FTPES: - return createFtpesClient(fileData); - case HTTP: - return createHttpClient(fileData); - case HTTPS: - return createHttpsClient(fileData); - default: - throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme()); - } - } - - public FilePublishInformation createFilePublishInformation(FileData fileData) { - FileReadyMessage.MessageMetaData metaData = fileData.messageMetaData; - return FilePublishInformation.builder() // - .productName(metaData.productName()) // - .vendorName(metaData.vendorName()) // - .lastEpochMicrosec(metaData.lastEpochMicrosec) // - .sourceName(metaData.sourceName) // - .startEpochMicrosec(metaData.startEpochMicrosec) // - .timeZoneOffset(metaData.timeZoneOffset) // - .name(metaData.sourceName + "/" + fileData.fileInfo.name) // - .compression(fileData.fileInfo.hashMap.compression) // - .fileFormatType(fileData.fileInfo.hashMap.fileFormatType) // - .fileFormatVersion(fileData.fileInfo.hashMap.fileFormatVersion) // - .changeIdentifier(fileData.messageMetaData.changeIdentifier) // - .objectStoreBucket(this.appConfig.isS3Enabled() ? this.appConfig.getS3Bucket() : null) // - .build(); - } - - protected SftpClient createSftpClient(FileData fileData) { - return new SftpClient(fileData.fileServerData(), new SftpClientSettings(appConfig.getSftpConfiguration())); - } - - protected FtpesClient createFtpesClient(FileData fileData) throws DatafileTaskException { - CertificateConfig config = appConfig.getCertificateConfiguration(); - Path trustedCa = config.trustedCa.isEmpty() ? null : Paths.get(config.trustedCa); - - return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert), config.keyPasswordPath, trustedCa, - config.trustedCaPasswordPath); - } - - protected FileCollectClient createHttpClient(FileData fileData) { - return new DfcHttpClient(fileData.fileServerData()); - } - - protected FileCollectClient createHttpsClient(FileData fileData) throws DatafileTaskException { - return new DfcHttpsClient(fileData.fileServerData(), HttpsClientConnectionManagerUtil.instance()); - } -}