2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
4 * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
5 * ===============================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
7 * in compliance with the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software distributed under the License
12 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
13 * or implied. See the License for the specific language governing permissions and limitations under
15 * ============LICENSE_END========================================================================
18 package org.oran.datafile.configuration;
25 import org.apache.kafka.clients.CommonClientConfigs;
26 import org.apache.kafka.common.config.SaslConfigs;
27 import org.apache.kafka.common.config.SslConfigs;
28 import org.apache.kafka.common.security.auth.SecurityProtocol;
29 import org.oran.datafile.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
30 import org.springframework.beans.factory.annotation.Value;
31 import org.springframework.boot.context.properties.EnableConfigurationProperties;
32 import org.springframework.stereotype.Component;
35 * Holds all configuration for the DFC.
39 @EnableConfigurationProperties
40 public class AppConfig {
42 @Value("${app.kafka.bootstrap-servers:}")
43 private String kafkaBootStrapServers;
45 @Value("${app.kafka.collected-file-topic:}")
47 private String collectedFileTopic;
49 @Value("${app.kafka.file-ready-event-topic:}")
51 private String inputTopic;
53 @Value("${app.kafka.client-id:undefined}")
55 private String kafkaClientId;
57 @Value("${app.collected-files-path}")
60 private String collectedFilesPath;
62 @Value("${app.sftp.strict-host-key-checking:false}")
63 private boolean strictHostKeyChecking;
65 @Value("${app.sftp.known-hosts-file-path:}")
67 private String knownHostsFilePath;
69 @Value("${app.ssl.key-store-password-file}")
70 private String clientKeyStorePassword = "";
72 @Value("${app.ssl.key-store:}")
73 private String clientKeyStore = "";
75 @Value("${app.ssl.trust-store:}")
76 private String clientTrustStore = "";
78 @Value("${app.ssl.trust-store-password-file:}")
79 private String clientTrustStorePassword;
83 @Value("${app.s3.endpointOverride:}")
84 private String s3EndpointOverride;
88 @Value("${app.s3.accessKeyId:}")
89 private String s3AccessKeyId;
93 @Value("${app.s3.secretAccessKey:}")
94 private String s3SecretAccessKey;
98 @Value("${app.s3.bucket:}")
99 private String s3Bucket;
101 @Value("${app.s3.locksBucket:}")
103 private String s3LocksBucket;
105 @Value("${app.number-of-worker-treads:200}")
107 private int noOfWorkerThreads;
109 @Value("${app.kafka.ssl.key-store-location}")
110 private String kafkaKeyStoreLocation;
112 @Value("${app.kafka.ssl.key-store-type}")
113 private String kafkaKeyStoreType;
115 @Value("${app.kafka.ssl.key-store-password}")
116 private String kafkaKeyStorePassword;
118 @Value("${app.kafka.ssl.trust-store-type}")
119 private String kafkaTrustStoreType;
121 @Value("${app.kafka.ssl.trust-store-location}")
122 private String kafkTrustStoreLocation;
124 @Value("${app.kafka.use-oath-token}")
125 private boolean useOathToken;
127 public String getS3LocksBucket() {
128 return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
131 public boolean isS3Enabled() {
132 return !s3EndpointOverride.isEmpty() && !s3Bucket.isEmpty();
135 public String getKafkaBootStrapServers() {
136 return kafkaBootStrapServers;
139 public synchronized CertificateConfig getCertificateConfiguration() {
140 return CertificateConfig.builder() //
141 .trustedCa(this.clientTrustStore) //
142 .trustedCaPasswordPath(this.clientTrustStorePassword) //
143 .keyCert(this.clientKeyStore) //
144 .keyPasswordPath(this.clientKeyStorePassword) //
148 public synchronized SftpConfig getSftpConfiguration() {
149 return SftpConfig.builder() //
150 .knownHostsFilePath(this.knownHostsFilePath) //
151 .strictHostKeyChecking(this.strictHostKeyChecking) //
155 public void addKafkaSecurityProps(Map<String, Object> props) {
158 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
159 props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
160 props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
161 OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
162 props.put(SaslConfigs.SASL_JAAS_CONFIG,
163 "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
165 if (!kafkaKeyStoreLocation.isEmpty()) {
166 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
168 props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
169 props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
170 if (!kafkaKeyStorePassword.isEmpty()) {
171 props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
173 if (!kafkTrustStoreLocation.isEmpty()) {
174 props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
175 props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);