Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / configuration / ApplicationConfig.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer.configuration;
22
23 import java.lang.invoke.MethodHandles;
24 import java.nio.charset.Charset;
25 import java.nio.file.Files;
26 import java.nio.file.Path;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Map;
30
31 import lombok.Getter;
32 import lombok.Setter;
33 import lombok.ToString;
34
35 import org.apache.kafka.clients.CommonClientConfigs;
36 import org.apache.kafka.common.config.SaslConfigs;
37 import org.apache.kafka.common.config.SslConfigs;
38 import org.apache.kafka.common.security.auth.SecurityProtocol;
39 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
40 import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
41 import org.oran.pmproducer.repository.InfoType;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Value;
45 import org.springframework.boot.context.properties.EnableConfigurationProperties;
46
47 @EnableConfigurationProperties
48 @ToString
49 public class ApplicationConfig {
50
51     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
52
53     @Getter
54     @Value("${app.configuration-filepath}")
55     private String localConfigurationFilePath;
56
57     @Value("${server.ssl.key-store-type}")
58     private String sslKeyStoreType = "";
59
60     @Value("${server.ssl.key-store-password}")
61     private String sslKeyStorePassword = "";
62
63     @Value("${server.ssl.key-store}")
64     private String sslKeyStore = "";
65
66     @Value("${server.ssl.key-password}")
67     private String sslKeyPassword = "";
68
69     @Value("${app.webclient.trust-store-used}")
70     private boolean sslTrustStoreUsed = false;
71
72     @Value("${app.webclient.trust-store-password}")
73     private String sslTrustStorePassword = "";
74
75     @Value("${app.webclient.trust-store}")
76     private String sslTrustStore = "";
77
78     @Value("${app.webclient.http.proxy-host}")
79     private String httpProxyHost = "";
80
81     @Value("${app.webclient.http.proxy-port}")
82     private int httpProxyPort = 0;
83
84     @Getter
85     @Setter
86     @Value("${server.port}")
87     private int localServerHttpPort;
88
89     @Getter
90     @Value("${app.ics-base-url}")
91     private String icsBaseUrl;
92
93     @Getter
94     @Value("${app.pm-producer-base-url}")
95     private String selfUrl;
96
97     @Getter
98     @Value("${app.kafka.bootstrap-servers:}")
99     private String kafkaBootStrapServers;
100
101     @Getter
102     @Value("${app.kafka.max-poll-records:300}")
103     private int kafkaMaxPollRecords;
104
105     @Getter
106     @Value("${app.pm-files-path}")
107     private String pmFilesPath;
108
109     @Getter
110     @Value("${app.s3.endpointOverride:}")
111     private String s3EndpointOverride;
112
113     @Getter
114     @Value("${app.s3.accessKeyId:}")
115     private String s3AccessKeyId;
116
117     @Getter
118     @Value("${app.s3.secretAccessKey:}")
119     private String s3SecretAccessKey;
120
121     @Getter
122     @Value("${app.s3.locksBucket:}")
123     private String s3LocksBucket;
124
125     @Getter
126     @Value("${app.s3.bucket:}")
127     private String s3Bucket;
128
129     @Getter
130     @Setter
131     @Value("${app.zip-output}")
132     private boolean zipOutput;
133
134     @Value("${app.kafka.ssl.key-store-type}")
135     private String kafkaKeyStoreType;
136
137     @Value("${app.kafka.ssl.key-store-location}")
138     private String kafkaKeyStoreLocation;
139
140     @Value("${app.kafka.ssl.key-store-password}")
141     private String kafkaKeyStorePassword;
142
143     @Value("${app.kafka.ssl.trust-store-type}")
144     private String kafkaTrustStoreType;
145
146     @Value("${app.kafka.ssl.trust-store-location}")
147     private String kafkTrustStoreLocation;
148
149     @Value("${app.kafka.use-oath-token}")
150     private boolean useOathToken;
151
152     private WebClientConfig webClientConfig = null;
153
154     public WebClientConfig getWebClientConfig() {
155         if (this.webClientConfig == null) {
156             HttpProxyConfig httpProxyConfig = HttpProxyConfig.builder() //
157                     .httpProxyHost(this.httpProxyHost) //
158                     .httpProxyPort(this.httpProxyPort) //
159                     .build();
160
161             this.webClientConfig = WebClientConfig.builder() //
162                     .keyStoreType(this.sslKeyStoreType) //
163                     .keyStorePassword(this.sslKeyStorePassword) //
164                     .keyStore(this.sslKeyStore) //
165                     .keyPassword(this.sslKeyPassword) //
166                     .isTrustStoreUsed(this.sslTrustStoreUsed) //
167                     .trustStore(this.sslTrustStore) //
168                     .trustStorePassword(this.sslTrustStorePassword) //
169                     .httpProxyConfig(httpProxyConfig) //
170                     .build();
171         }
172         return this.webClientConfig;
173     }
174
175     public boolean isS3Enabled() {
176         return !(s3EndpointOverride.isBlank() || s3Bucket.isBlank());
177     }
178
179     // Adapter to parse the json format of the configuration file.
180     static class ConfigFile {
181         Collection<InfoType> types;
182     }
183
184     public Collection<InfoType> getTypes() {
185         com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
186         try {
187             String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
188             ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
189             return configData.types;
190         } catch (Exception e) {
191             logger.error("Could not load configuration file {}", getLocalConfigurationFilePath());
192             return Collections.emptyList();
193         }
194     }
195
196     public void addKafkaSecurityProps(Map<String, Object> props) {
197
198         if (useOathToken) {
199             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
200             props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
201             props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
202                     OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
203             props.put(SaslConfigs.SASL_JAAS_CONFIG,
204                     "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
205         }
206         if (!kafkaKeyStoreLocation.isEmpty()) {
207             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
208             // SSL
209             props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
210             props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
211             if (!kafkaKeyStorePassword.isEmpty()) {
212                 props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
213             }
214             if (!kafkTrustStoreLocation.isEmpty()) {
215                 props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
216                 props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
217             }
218         }
219     }
220
221 }