org.springframework: WARN
org.springframework.data: WARN
org.springframework.web.reactive.function.client.ExchangeFunctions: WARN
- org.onap.dcaegen2.collectors.datafile: INFO
+ org.oran.datafile: INFO
file:
name: /var/log/ONAP/application.log
number-of-worker-treads: 200
# KAFKA boostrap servers.
# several redundant boostrap servers can be specified, separated by a comma ','.
+ # If the file name is empty, no authorization token is used
+ auth-token-file:
kafka:
bootstrap-servers: localhost:9092
# output topic
client-id: datafile-1
# input topic
file-ready-event-topic: file-ready
+ # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured
+ use-oath-token: false
+ ssl:
+ key-store-type: PEM
+ key-store-location:
+ # key password is needed if the private key is encrypted
+ key-store-password:
+ trust-store-type: PEM
+ trust-store-location:
sftp:
known-hosts-file-path:
strict-host-key-checking: false
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<finalName>${project.artifactId}</finalName>
- <mainClass>org.onap.dcaegen2.collectors.datafile.MainApp</mainClass>
+ <mainClass>org.oran.datafile.MainApp</mainClass>
</configuration>
<executions>
<execution>
/*-
* ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile;
+package org.oran.datafile;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* The main app of DFC.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@SpringBootApplication()
@EnableScheduling
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.commons;
+package org.oran.datafile.commons;
import java.nio.file.Path;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.DatafileTaskException;
/**
* A closeable file client.
- *
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public interface FileCollectClient extends AutoCloseable {
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Modifications copyright (C) 2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.commons;
+package org.oran.datafile.commons;
import java.util.ArrayList;
import java.util.List;
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.commons;
+package org.oran.datafile.commons;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.DatafileTaskException;
/**
* Enum specifying the schemes that DFC support for downloading files.
*
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
*/
public enum Scheme {
FTPES, SFTP, HTTP, HTTPS;
*
* @param schemeString the string to convert to <code>Scheme</code>.
* @return The corresponding <code>Scheme</code>
- * @throws DatafileTaskException if the value of the string doesn't match any defined scheme.
+ * @throws DatafileTaskException if the value of the string doesn't match any
+ * defined scheme.
*/
public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
Scheme result;
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.commons;
+package org.oran.datafile.commons;
import java.io.IOException;
import java.nio.file.Files;
/**
* Utility class containing functions used for certificates configuration
- *
- * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
*/
public final class SecurityUtil {
private SecurityUtil() {
/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.configuration;
+package org.oran.datafile.configuration;
-import java.util.Properties;
+import java.util.Map;
import lombok.Getter;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.oran.datafile.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Holds all configuration for the DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on
- * 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Component
@EnableConfigurationProperties
public class AppConfig {
- @Value("#{systemEnvironment}")
- Properties systemEnvironment;
-
@Value("${app.kafka.bootstrap-servers:}")
private String kafkaBootStrapServers;
@Value("${app.kafka.collected-file-topic:}")
- public String collectedFileTopic;
+ @Getter
+ private String collectedFileTopic;
@Value("${app.kafka.file-ready-event-topic:}")
- public String fileReadyEventTopic;
+ @Getter
+ private String inputTopic;
@Value("${app.kafka.client-id:undefined}")
- public String kafkaClientId;
+ @Getter
+ private String kafkaClientId;
- @Value("${app.collected-files-path:}")
- public String collectedFilesPath;
+ @Value("${app.collected-files-path}")
+ @Getter
+ private String collectedFilesPath;
@Value("${app.sftp.strict-host-key-checking:false}")
- public boolean strictHostKeyChecking;
+ private boolean strictHostKeyChecking;
@Value("${app.sftp.known-hosts-file-path:}")
- public String knownHostsFilePath;
+ @Getter
+ private String knownHostsFilePath;
@Value("${app.ssl.key-store-password-file}")
private String clientKeyStorePassword = "";
@Getter
private int noOfWorkerThreads;
+ @Value("${app.kafka.ssl.key-store-location}")
+ private String kafkaKeyStoreLocation;
+
+ @Value("${app.kafka.ssl.key-store-type}")
+ private String kafkaKeyStoreType;
+
+ @Value("${app.kafka.ssl.key-store-password}")
+ private String kafkaKeyStorePassword;
+
+ @Value("${app.kafka.ssl.trust-store-type}")
+ private String kafkaTrustStoreType;
+
+ @Value("${app.kafka.ssl.trust-store-location}")
+ private String kafkTrustStoreLocation;
+
+ @Value("${app.kafka.use-oath-token}")
+ private boolean useOathToken;
+
public String getS3LocksBucket() {
return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
}
.build();
}
+ public void addKafkaSecurityProps(Map<String, Object> props) {
+
+ if (useOathToken) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+ }
+ if (!kafkaKeyStoreLocation.isEmpty()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+ // SSL
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+ if (!kafkaKeyStorePassword.isEmpty()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+ }
+ if (!kafkTrustStoreLocation.isEmpty()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+ }
+ }
+ }
+
}
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018-2022 Nokia. All rights reserved.
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019-2023 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.configuration;
+package org.oran.datafile.configuration;
import lombok.Builder;
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.configuration;
+package org.oran.datafile.configuration;
import lombok.Builder;
/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.configuration;
+package org.oran.datafile.configuration;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
version = SwaggerConfig.VERSION,
description = SwaggerConfig.DESCRIPTION,
license = @License(
- name = "Copyright (C) 2020 Nordix Foundation. Licensed under the Apache License.",
+ name = "Copyright (C) 2020-2023 NordixFoundation. Licensed under the Apache License.",
url = "http://www.apache.org/licenses/LICENSE-2.0")))
public class SwaggerConfig {
/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.controllers;
+package org.oran.datafile.controllers;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.tasks.CollectAndReportFiles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
* ========================LICENSE_END===================================
*/
-package org.onap.dcaegen2.collectors.datafile.datastore;
+package org.oran.datafile.datastore;
import java.nio.file.Path;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.AppConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
* ========================LICENSE_END===================================
*/
-package org.onap.dcaegen2.collectors.datafile.datastore;
+package org.oran.datafile.datastore;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.FileSystemUtils;
public class FileStore implements DataStore {
private static final Logger logger = LoggerFactory.getLogger(java.lang.invoke.MethodHandles.lookup().lookupClass());
- AppConfig applicationConfig;
+ AppConfig appConfig;
public FileStore(AppConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
+ this.appConfig = applicationConfig;
}
@Override
public Flux<String> listObjects(Bucket bucket, String prefix) {
- Path root = Path.of(applicationConfig.collectedFilesPath, prefix);
+ Path root = Path.of(appConfig.getCollectedFilesPath(), prefix);
if (!root.toFile().exists()) {
root = root.getParent();
}
private String externalName(Path path) {
String fullName = path.toString();
- String externalName = fullName.substring(applicationConfig.collectedFilesPath.length());
+ String externalName = fullName.substring(appConfig.getCollectedFilesPath().length());
if (externalName.startsWith("/")) {
externalName = externalName.substring(1);
}
}
private Path path(String name) {
- return Path.of(applicationConfig.collectedFilesPath, name);
+ return Path.of(appConfig.getCollectedFilesPath(), name);
}
public Mono<Boolean> fileExists(Bucket bucket, String key) {
@Override
public Mono<String> deleteBucket(Bucket bucket) {
try {
- FileSystemUtils.deleteRecursively(Path.of(applicationConfig.collectedFilesPath));
+ FileSystemUtils.deleteRecursively(Path.of(appConfig.getCollectedFilesPath()));
} catch (IOException e) {
- logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.collectedFilesPath,
+ logger.debug("Could not delete directory: {}, reason; {}", appConfig.getCollectedFilesPath(),
e.getMessage());
}
return Mono.just("OK");
* ========================LICENSE_END===================================
*/
-package org.onap.dcaegen2.collectors.datafile.datastore;
+package org.oran.datafile.datastore;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.oran.datafile.exceptions;
public class DatafileTaskException extends Exception {
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.oran.datafile.exceptions;
/**
* Exception thrown when there is a problem with the Consul environment.
*
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
*/
public class EnvironmentLoaderException extends Exception {
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.oran.datafile.exceptions;
public class NonRetryableDatafileTaskException extends DatafileTaskException {
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.oran.datafile.ftp;
import java.io.File;
import java.io.FileOutputStream;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.commons.SecurityUtil;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.commons.FileCollectClient;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.commons.SecurityUtil;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
/**
* Gets file from PNF with FTPS protocol.
- *
- * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
public class FtpesClient implements FileCollectClient {
private static final Logger logger = LoggerFactory.getLogger(FtpesClient.class);
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation, 2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation, 2020 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.oran.datafile.ftp;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import java.nio.file.Path;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.commons.FileCollectClient;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Gets file from xNF with SFTP protocol.
*
- * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
public class SftpClient implements FileCollectClient {
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.oran.datafile.ftp;
import java.io.File;
-import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
+import org.oran.datafile.configuration.SftpConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.oran.datafile.commons.FileCollectClient;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.service.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Gets file from PNF with HTTP protocol.
*
- * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
*/
public class DfcHttpClient implements FileCollectClient {
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import java.io.IOException;
import java.io.InputStream;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.oran.datafile.commons.FileCollectClient;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.service.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Gets file from PNF with HTTPS protocol.
*
- * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
*/
public class DfcHttpsClient implements FileCollectClient {
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import java.io.File;
import java.io.IOException;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
-import org.onap.dcaegen2.collectors.datafile.commons.SecurityUtil;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.commons.SecurityUtil;
+import org.oran.datafile.exceptions.DatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
/**
* Utility class supplying connection manager for HTTPS protocol.
*
- * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
*/
public class HttpsClientConnectionManagerUtil {
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.model;
+package org.oran.datafile.model;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019-2023 Nordix Foundation.
* Copyright (C) 2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.model;
+package org.oran.datafile.model;
import java.net.URI;
import java.nio.file.Path;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.net.URIBuilder;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData.FileServerDataBuilder;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.commons.FileServerData.FileServerDataBuilder;
+import org.oran.datafile.commons.Scheme;
+import org.oran.datafile.configuration.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @return the path to the locally stored file.
*/
public Path getLocalFilePath(AppConfig config) {
- return Paths.get(config.collectedFilesPath, this.messageMetaData.sourceName, fileInfo.name);
+ return Paths.get(config.getCollectedFilesPath(), this.messageMetaData.sourceName, fileInfo.name);
}
/**
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.model;
+package org.oran.datafile.model;
import lombok.Builder;
import lombok.EqualsAndHashCode;
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.model;
+package org.oran.datafile.model;
import java.util.List;
--- /dev/null
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.datafile.oauth2;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+
+import lombok.ToString;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerTokenJwt implements OAuthBearerToken {
+ private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class);
+ private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+
+ private final String jwtTokenRaw;
+ private final JwtTokenBody tokenBody;
+
+ @ToString
+ private static class JwtTokenBody {
+ String sub = ""; // principalName
+ long exp = 0; // expirationTime
+ long iat = 0; // startTime
+ String scope = "";
+ }
+
+ public static OAuthBearerTokenJwt create(String tokenRaw)
+ throws DatafileTaskException, JsonMappingException, JsonProcessingException {
+ String[] chunks = tokenRaw.split("\\.");
+ Base64.Decoder decoder = Base64.getUrlDecoder();
+ if (chunks.length < 2) {
+ throw new DatafileTaskException("Could not parse JWT token: " + tokenRaw);
+
+ }
+ String payloadStr = new String(decoder.decode(chunks[1]));
+ JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class);
+ logger.error("Token: {}", token);
+ return new OAuthBearerTokenJwt(token, tokenRaw);
+ }
+
+ private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) {
+ super();
+ this.jwtTokenRaw = accessToken;
+ this.tokenBody = jwtTokenBody;
+ }
+
+ @Override
+ public String value() {
+ return jwtTokenRaw;
+ }
+
+ @Override
+ public Set<String> scope() {
+ Set<String> res = new HashSet<>();
+ if (!this.tokenBody.scope.isEmpty()) {
+ res.add(this.tokenBody.scope);
+ }
+ return res;
+ }
+
+ @Override
+ public long lifetimeMs() {
+ if (this.tokenBody.exp == 0) {
+ return Long.MAX_VALUE;
+ }
+ return this.tokenBody.exp * 1000;
+ }
+
+ @Override
+ public String principalName() {
+ return this.tokenBody.sub;
+ }
+
+ @Override
+ public Long startTimeMs() {
+ return this.tokenBody.iat;
+ }
+
+}
--- /dev/null
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.datafile.oauth2;
+
+import java.io.IOException;
+import java.util.*;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+ private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
+
+ private boolean isConfigured = false;
+
+ @Override
+ public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+
+ if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+ throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+ if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+ throw new IllegalArgumentException(String.format(
+ "Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size()));
+ isConfigured = true;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+
+ if (!this.isConfigured)
+ throw new IllegalStateException("Callback handler not configured");
+ for (Callback callback : callbacks) {
+ logger.debug("callback " + callback.toString());
+ if (callback instanceof OAuthBearerTokenCallback) {
+ handleCallback((OAuthBearerTokenCallback) callback);
+ } else if (callback instanceof SaslExtensionsCallback) {
+ handleCallback((SaslExtensionsCallback) callback);
+ } else {
+ logger.error("Unsupported callback: {}", callback);
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+
+ private void handleCallback(SaslExtensionsCallback callback) {
+ callback.extensions(SaslExtensions.empty());
+ }
+
+ private void handleCallback(OAuthBearerTokenCallback callback) {
+ try {
+ if (callback.token() != null) {
+ throw new DatafileTaskException("Callback had a token already");
+ }
+
+ String accessToken = SecurityContext.getInstance().getBearerAuthToken();
+ OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
+
+ callback.token(token);
+ } catch (Exception e) {
+ logger.error("Could not handle login callback: {}", e.getMessage());
+ }
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2023 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.datafile.oauth2;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@EnableConfigurationProperties
+@ConfigurationProperties()
+@Component
+public class SecurityContext {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private long tokenTimestamp = 0;
+
+ private String authToken = "";
+
+ @Getter
+ private static SecurityContext instance;
+
+ @Setter
+ private Path authTokenFilePath;
+
+ public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
+ instance = this;
+ if (!authTokenFilename.isEmpty()) {
+ this.authTokenFilePath = Path.of(authTokenFilename);
+ }
+ }
+
+ public boolean isConfigured() {
+ return authTokenFilePath != null;
+ }
+
+ public synchronized String getBearerAuthToken() {
+ if (!isConfigured()) {
+ logger.warn("No configuration for auth token");
+ return "";
+ }
+ try {
+ long lastModified = authTokenFilePath.toFile().lastModified();
+ if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
+ this.authToken = Files.readString(authTokenFilePath);
+ this.authToken = this.authToken.trim();
+ this.tokenTimestamp = lastModified;
+ }
+ } catch (Exception e) {
+ logger.warn("Could not read auth token file: {}, reason: {}", authTokenFilePath, e.getMessage());
+ }
+ return this.authToken;
+ }
+
+}
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Modifications Copyright (C) 2020-2021 Nokia. All rights reserved
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.service;
+package org.oran.datafile.service;
import java.util.Base64;
import java.util.List;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.http.HttpStatus;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.oran.datafile.commons.FileServerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.oran.datafile.commons.Scheme;
+import org.oran.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.CertificateConfig;
+import org.oran.datafile.datastore.DataStore;
+import org.oran.datafile.datastore.DataStore.Bucket;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.http.HttpsClientConnectionManagerUtil;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.model.FileData;
+import org.oran.datafile.model.FilePublishInformation;
+import org.oran.datafile.model.FileReadyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
.flatMap(this::filterNotFetched, false, 1, 1) //
.flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
+ .flatMap(data -> reportFetchedFile(data, appConfig.getCollectedFileTopic()), false, 1) //
.sequential() //
.doOnError(t -> logger.error("Received error: {}", t.toString())); //
}
}
private Path locaFilePath(FilePublishInformation info) {
- return Paths.get(this.appConfig.collectedFilesPath, info.getName());
+ return Paths.get(appConfig.getCollectedFilesPath(), info.getName());
}
private void deleteLocalFile(FilePublishInformation info) {
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ this.appConfig.addKafkaSecurityProps(props);
return SenderOptions.create(props);
}
* polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchFromKafka() {
- KafkaTopicListener listener = new KafkaTopicListener(this.appConfig.getKafkaBootStrapServers(),
- this.appConfig.kafkaClientId, this.appConfig.fileReadyEventTopic);
+ KafkaTopicListener listener = new KafkaTopicListener(this.appConfig);
return listener.getFlux() //
.flatMap(this::parseReceivedFileReadyMessage, 1);
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2022 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
-import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
-import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
-import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.oran.datafile.commons.FileCollectClient;
+import org.oran.datafile.commons.Scheme;
+import org.oran.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.CertificateConfig;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.ftp.FtpesClient;
+import org.oran.datafile.ftp.SftpClient;
+import org.oran.datafile.ftp.SftpClientSettings;
+import org.oran.datafile.http.DfcHttpClient;
+import org.oran.datafile.http.DfcHttpsClient;
+import org.oran.datafile.http.HttpsClientConnectionManagerUtil;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.model.FileData;
+import org.oran.datafile.model.FilePublishInformation;
+import org.oran.datafile.model.FileReadyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Collects a file from a PNF.
- *
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class FileCollector {
* ========================LICENSE_END===================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.datafile.configuration.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
- private final String inputTopic;
- private final String kafkaBoostrapServers;
- private final String kafkaClientId;
private Flux<DataFromTopic> dataFromTopic;
+ private final AppConfig appConfig;
- public KafkaTopicListener(String kafkaBoostrapServers, String clientId, String topic) {
- this.kafkaClientId = clientId;
- this.kafkaBoostrapServers = kafkaBoostrapServers;
- this.inputTopic = topic;
+ public KafkaTopicListener(AppConfig applConfig) {
+ this.appConfig = applConfig;
}
public Flux<DataFromTopic> getFlux() {
}
private Flux<DataFromTopic> startReceiveFromTopic() {
- logger.debug("Listening to kafka topic: {}, client id: {}", this.inputTopic, this.kafkaClientId);
+ logger.debug("Listening to kafka topic: {}, client id: {}", appConfig.getInputTopic(),
+ appConfig.getKafkaClientId());
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
- .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.inputTopic, input.value())) //
+ .doOnNext(
+ input -> logger.debug("Received from kafka topic: {} :{}", appConfig.getInputTopic(), input.value())) //
.doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
.doFinally(sig -> this.dataFromTopic = null) //
private ReceiverOptions<String, String> kafkaInputProperties() {
Map<String, Object> consumerProps = new HashMap<>();
- if (this.kafkaBoostrapServers.isEmpty()) {
+ if (appConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaBoostrapServers);
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + inputTopic);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootStrapServers());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + appConfig.getInputTopic());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.kafkaClientId);
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, appConfig.getKafkaClientId());
+ this.appConfig.addKafkaSecurityProps(consumerProps);
return ReceiverOptions.<String, String>create(consumerProps)
- .subscription(Collections.singleton(this.inputTopic));
+ .subscription(Collections.singleton(appConfig.getInputTopic()));
}
}
* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2020 Nordix Foundation
+ * Copyright (C) 2020-2023 NordixFoundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ========================LICENSE_END===================================
*/
-package org.onap.dcaegen2.collectors.datafile;
+package org.oran.datafile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage.MessageMetaData;
-import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
-import org.onap.dcaegen2.collectors.datafile.tasks.FileCollector;
-import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener;
-import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener.DataFromTopic;
+import org.oran.datafile.configuration.AppConfig;
+import org.oran.datafile.datastore.DataStore;
+import org.oran.datafile.datastore.DataStore.Bucket;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.model.FileData;
+import org.oran.datafile.model.FilePublishInformation;
+import org.oran.datafile.model.FileReadyMessage;
+import org.oran.datafile.model.FileReadyMessage.MessageMetaData;
+import org.oran.datafile.tasks.CollectAndReportFiles;
+import org.oran.datafile.tasks.FileCollector;
+import org.oran.datafile.tasks.KafkaTopicListener;
+import org.oran.datafile.tasks.KafkaTopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
"app.s3.accessKeyId=minio", //
"app.s3.secretAccessKey=miniostorage", //
"app.s3.bucket=ropfiles", //
- "app.s3.locksBucket=locks"})
+ "app.s3.locksBucket=locks", //
+ "app.auth-token-file=src/test/resources/jwtToken.b64", //
+ "app.kafka.use-oath-token=false"})
@SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
class MockDatafile {
// Create a listener to the output topic. The KafkaTopicListener happens to be
// suitable for that,
-
- KafkaTopicListener topicListener =
- new KafkaTopicListener(applicationConfig.getKafkaBootStrapServers(), "MockDatafile", outputTopic);
+ AppConfig config = spy(applicationConfig);
+ when(config.getKafkaClientId()).thenReturn("MockDatafile");
+ when(config.getInputTopic()).thenReturn(outputTopic);
+ KafkaTopicListener topicListener = new KafkaTopicListener(config);
topicListener.getFlux() //
.doOnNext(this::set) //
try {
Path from = Path.of("config/application.yaml");
- Path to = Path.of(this.appConfig.collectedFilesPath, fileData.name());
+ Path to = Path.of(appConfig.getCollectedFilesPath(), fileData.name());
Files.createDirectories(to.getParent());
Files.copy(from, to, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
@BeforeEach
void init() {
if (kafkaReceiver == null) {
- kafkaReceiver = new KafkaReceiver(this.appConfig, this.appConfig.collectedFileTopic);
+ kafkaReceiver = new KafkaReceiver(appConfig, appConfig.getCollectedFileTopic());
}
kafkaReceiver.reset();
deleteAllFiles();
private void deleteAllFiles() {
try {
- FileSystemUtils.deleteRecursively(Path.of(this.appConfig.collectedFilesPath));
+ FileSystemUtils.deleteRecursively(Path.of(appConfig.getCollectedFilesPath()));
} catch (IOException e) {
}
}
void testKafka() throws InterruptedException {
waitForKafkaListener();
- this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", "junk").blockLast();
+ this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", "junk").blockLast();
String fileReadyMessage = gson.toJson(fileReadyMessage());
- this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", fileReadyMessage).blockLast();
+ this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage).blockLast();
await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
String rec = kafkaReceiver.lastValue();
Flux.range(1, NO_OF_OBJECTS) //
.map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
- .flatMap(fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.fileReadyEventTopic, "key",
- fileReadyMessage)) //
+ .flatMap(
+ fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage)) //
.blockLast(); //
while (kafkaReceiver.count < NO_OF_OBJECTS) {
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019-2023 Nordix Foundation.
* Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.controllers;
+package org.oran.datafile.controllers;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.tasks.CollectAndReportFiles;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.oran.datafile.ftp;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.oran.datafile.commons.FileServerData;
import org.springframework.http.HttpStatus;
public class FtpesClientTest {
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.oran.datafile.ftp;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
+import org.oran.datafile.configuration.SftpConfig;
public class SftpClientSettingsTest {
/*-
* ============LICENSE_START======================================================================
+ * Copyright (C) 2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.oran.datafile.ftp;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.configuration.SftpConfig;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
@ExtendWith(MockitoExtension.class)
public class SftpClientTest {
/*-
* ============LICENSE_START======================================================================
+ * Copyright (C) 2020-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.service.HttpUtils;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClientConfig;
/*-
* ============LICENSE_START======================================================================
+ * Copyright (C) 2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.commons.FileServerData;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
@ExtendWith(MockitoExtension.class)
class DfcHttpsClientTest {
/*-
* ============LICENSE_START======================================================================
+ * Copyright (C) 2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
/*-
* ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* the License.
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.http;
+package org.oran.datafile.http;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.DatafileTaskException;
@ExtendWith(MockitoExtension.class)
public class HttpsClientConnectionManagerUtilTest {
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.scheme;
+package org.oran.datafile.scheme;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.commons.Scheme;
+import org.oran.datafile.exceptions.DatafileTaskException;
public class SchemeTest {
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Modifications Copyright (C) 2021 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.service;
+package org.oran.datafile.service;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.net.URIBuilder;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.oran.datafile.commons.FileServerData;
class HttpUtilsTest {
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* Copyright (C) 2020-2022 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
-import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.oran.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.CertificateConfig;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.oran.datafile.ftp.FtpesClient;
+import org.oran.datafile.ftp.SftpClient;
+import org.oran.datafile.http.DfcHttpClient;
+import org.oran.datafile.http.DfcHttpsClient;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.model.FileData;
+import org.oran.datafile.model.FilePublishInformation;
+import org.oran.datafile.model.FileReadyMessage;
import reactor.test.StepVerifier;
public class FileCollectorTest {
@BeforeAll
static void setUpConfiguration() {
when(appConfigMock.getCertificateConfiguration()).thenReturn(certificateConfigMock);
- appConfigMock.collectedFilesPath = DATAFILE_TMPDIR;
+ when(appConfigMock.getCollectedFilesPath()).thenReturn(DATAFILE_TMPDIR);
certificateConfigMock.keyPasswordPath = CERTIFICATE_KEY_PASSWORD_PATH;
certificateConfigMock.trustedCa = TRUSTED_CA_PATH;
certificateConfigMock.trustedCaPasswordPath = TRUSTED_CA_PASSWORD_PATH;
/*-
* ============LICENSE_START=======================================================
- * 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.utils;
+package org.oran.datafile.utils;
import java.util.ArrayList;
import java.util.Iterator;
/**
* Utility class to produce correctly formatted fileReady event Json messages.
- *
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> on 7/25/18
- *
*/
public class JsonMessage {
private String eventName;
}
/**
- * Can be used to produce a correct test Json message. Tip! Check the formatting with
+ * Can be used to produce a correct test Json message. Tip! Check the formatting
+ * with
* <a href="https://jsonformatter.org/">Json formatter</a>
*
* @param args Not used
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.collectors.datafile.utils;
+package org.oran.datafile.utils;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
public class LoggingUtils {
/**
- * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+ * Returns a ListAppender that contains all logging events. Call this method at
+ * the very beginning of the test
*/
public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass) {
return getLogListAppender(logClass, false);
}
/**
- * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+ * Returns a ListAppender that contains all logging events. Call this method at
+ * the very beginning of the test
*
* @param logClass class whose appender is wanted.
* @param allLevels true if all log levels should be activated.
--- /dev/null
+eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c