2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
4 * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
5 * ===============================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
7 * in compliance with the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software distributed under the License
12 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
13 * or implied. See the License for the specific language governing permissions and limitations under
15 * ============LICENSE_END========================================================================
18 package org.oran.datafile.configuration;
24 import org.apache.kafka.clients.CommonClientConfigs;
25 import org.apache.kafka.common.config.SaslConfigs;
26 import org.apache.kafka.common.config.SslConfigs;
27 import org.apache.kafka.common.security.auth.SecurityProtocol;
28 import org.oran.datafile.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
29 import org.springframework.beans.factory.annotation.Value;
30 import org.springframework.boot.context.properties.EnableConfigurationProperties;
31 import org.springframework.stereotype.Component;
34 * Holds all configuration for the DFC.
38 @EnableConfigurationProperties
39 public class AppConfig {
41 @Value("${app.kafka.bootstrap-servers:}")
42 private String kafkaBootStrapServers;
44 @Value("${app.kafka.collected-file-topic:}")
46 private String collectedFileTopic;
48 @Value("${app.kafka.file-ready-event-topic:}")
50 private String inputTopic;
52 @Value("${app.kafka.client-id:undefined}")
54 private String kafkaClientId;
56 @Value("${app.collected-files-path}")
58 private String collectedFilesPath;
60 @Value("${app.sftp.strict-host-key-checking:false}")
61 private boolean strictHostKeyChecking;
63 @Value("${app.sftp.known-hosts-file-path:}")
65 private String knownHostsFilePath;
67 @Value("${app.ssl.key-store-password-file}")
68 private String clientKeyStorePassword = "";
70 @Value("${app.ssl.key-store:}")
71 private String clientKeyStore = "";
73 @Value("${app.ssl.trust-store:}")
74 private String clientTrustStore = "";
76 @Value("${app.ssl.trust-store-password-file:}")
77 private String clientTrustStorePassword;
80 @Value("${app.s3.endpointOverride:}")
81 private String s3EndpointOverride;
84 @Value("${app.s3.accessKeyId:}")
85 private String s3AccessKeyId;
88 @Value("${app.s3.secretAccessKey:}")
89 private String s3SecretAccessKey;
92 @Value("${app.s3.bucket:}")
93 private String s3Bucket;
95 @Value("${app.s3.locksBucket:}")
96 private String s3LocksBucket;
98 @Value("${app.number-of-worker-treads:200}")
100 private int noOfWorkerThreads;
102 @Value("${app.kafka.ssl.key-store-location}")
103 private String kafkaKeyStoreLocation;
105 @Value("${app.kafka.ssl.key-store-type}")
106 private String kafkaKeyStoreType;
108 @Value("${app.kafka.ssl.key-store-password}")
109 private String kafkaKeyStorePassword;
111 @Value("${app.kafka.ssl.trust-store-type}")
112 private String kafkaTrustStoreType;
114 @Value("${app.kafka.ssl.trust-store-location}")
115 private String kafkTrustStoreLocation;
117 @Value("${app.kafka.use-oath-token}")
118 private boolean useOathToken;
120 public String getS3LocksBucket() {
121 return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
124 public boolean isS3Enabled() {
125 return !s3EndpointOverride.isEmpty() && !s3Bucket.isEmpty();
128 public String getKafkaBootStrapServers() {
129 return kafkaBootStrapServers;
132 public synchronized CertificateConfig getCertificateConfiguration() {
133 return CertificateConfig.builder() //
134 .trustedCa(this.clientTrustStore) //
135 .trustedCaPasswordPath(this.clientTrustStorePassword) //
136 .keyCert(this.clientKeyStore) //
137 .keyPasswordPath(this.clientKeyStorePassword) //
141 public synchronized SftpConfig getSftpConfiguration() {
142 return SftpConfig.builder() //
143 .knownHostsFilePath(this.knownHostsFilePath) //
144 .strictHostKeyChecking(this.strictHostKeyChecking) //
148 public void addKafkaSecurityProps(Map<String, Object> props) {
151 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
152 props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
153 props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
154 OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
155 props.put(SaslConfigs.SASL_JAAS_CONFIG,
156 "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
158 if (!kafkaKeyStoreLocation.isEmpty()) {
159 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
161 props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
162 props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
163 if (!kafkaKeyStorePassword.isEmpty()) {
164 props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
166 if (!kafkTrustStoreLocation.isEmpty()) {
167 props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
168 props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);