Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / configuration / AppConfig.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
4  * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
5  * ===============================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
7  * in compliance with the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software distributed under the License
12  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
13  * or implied. See the License for the specific language governing permissions and limitations under
14  * the License.
15  * ============LICENSE_END========================================================================
16  */
17
18 package org.oran.datafile.configuration;
19
20 import java.util.Map;
21
22 import lombok.Getter;
23
24 import org.apache.kafka.clients.CommonClientConfigs;
25 import org.apache.kafka.common.config.SaslConfigs;
26 import org.apache.kafka.common.config.SslConfigs;
27 import org.apache.kafka.common.security.auth.SecurityProtocol;
28 import org.oran.datafile.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
29 import org.springframework.beans.factory.annotation.Value;
30 import org.springframework.boot.context.properties.EnableConfigurationProperties;
31 import org.springframework.stereotype.Component;
32
33 /**
34  * Holds all configuration for the DFC.
35  */
36
37 @Component
38 @EnableConfigurationProperties
39 public class AppConfig {
40
41     @Value("${app.kafka.bootstrap-servers:}")
42     private String kafkaBootStrapServers;
43
44     @Value("${app.kafka.collected-file-topic:}")
45     @Getter
46     private String collectedFileTopic;
47
48     @Value("${app.kafka.file-ready-event-topic:}")
49     @Getter
50     private String inputTopic;
51
52     @Value("${app.kafka.client-id:undefined}")
53     @Getter
54     private String kafkaClientId;
55
56     @Value("${app.collected-files-path}")
57     @Getter
58     private String collectedFilesPath;
59
60     @Value("${app.sftp.strict-host-key-checking:false}")
61     private boolean strictHostKeyChecking;
62
63     @Value("${app.sftp.known-hosts-file-path:}")
64     @Getter
65     private String knownHostsFilePath;
66
67     @Value("${app.ssl.key-store-password-file}")
68     private String clientKeyStorePassword = "";
69
70     @Value("${app.ssl.key-store:}")
71     private String clientKeyStore = "";
72
73     @Value("${app.ssl.trust-store:}")
74     private String clientTrustStore = "";
75
76     @Value("${app.ssl.trust-store-password-file:}")
77     private String clientTrustStorePassword;
78
79     @Getter
80     @Value("${app.s3.endpointOverride:}")
81     private String s3EndpointOverride;
82
83     @Getter
84     @Value("${app.s3.accessKeyId:}")
85     private String s3AccessKeyId;
86
87     @Getter
88     @Value("${app.s3.secretAccessKey:}")
89     private String s3SecretAccessKey;
90
91     @Getter
92     @Value("${app.s3.bucket:}")
93     private String s3Bucket;
94
95     @Value("${app.s3.locksBucket:}")
96     private String s3LocksBucket;
97
98     @Value("${app.number-of-worker-treads:200}")
99     @Getter
100     private int noOfWorkerThreads;
101
102     @Value("${app.kafka.ssl.key-store-location}")
103     private String kafkaKeyStoreLocation;
104
105     @Value("${app.kafka.ssl.key-store-type}")
106     private String kafkaKeyStoreType;
107
108     @Value("${app.kafka.ssl.key-store-password}")
109     private String kafkaKeyStorePassword;
110
111     @Value("${app.kafka.ssl.trust-store-type}")
112     private String kafkaTrustStoreType;
113
114     @Value("${app.kafka.ssl.trust-store-location}")
115     private String kafkTrustStoreLocation;
116
117     @Value("${app.kafka.use-oath-token}")
118     private boolean useOathToken;
119
120     public String getS3LocksBucket() {
121         return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
122     }
123
124     public boolean isS3Enabled() {
125         return !s3EndpointOverride.isEmpty() && !s3Bucket.isEmpty();
126     }
127
128     public String getKafkaBootStrapServers() {
129         return kafkaBootStrapServers;
130     }
131
132     public synchronized CertificateConfig getCertificateConfiguration() {
133         return CertificateConfig.builder() //
134             .trustedCa(this.clientTrustStore) //
135             .trustedCaPasswordPath(this.clientTrustStorePassword) //
136             .keyCert(this.clientKeyStore) //
137             .keyPasswordPath(this.clientKeyStorePassword) //
138             .build();
139     }
140
141     public synchronized SftpConfig getSftpConfiguration() {
142         return SftpConfig.builder() //
143             .knownHostsFilePath(this.knownHostsFilePath) //
144             .strictHostKeyChecking(this.strictHostKeyChecking) //
145             .build();
146     }
147
148     public void addKafkaSecurityProps(Map<String, Object> props) {
149
150         if (useOathToken) {
151             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
152             props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
153             props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
154                 OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
155             props.put(SaslConfigs.SASL_JAAS_CONFIG,
156                 "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
157         }
158         if (!kafkaKeyStoreLocation.isEmpty()) {
159             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
160             // SSL
161             props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
162             props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
163             if (!kafkaKeyStorePassword.isEmpty()) {
164                 props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
165             }
166             if (!kafkTrustStoreLocation.isEmpty()) {
167                 props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
168                 props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
169             }
170         }
171     }
172
173 }