2 * ========================LICENSE_START=================================
5 * Copyright (C) 2023 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.pmlog.configuration;
23 import java.lang.invoke.MethodHandles;
24 import java.nio.charset.Charset;
25 import java.nio.file.Files;
29 import lombok.ToString;
31 import org.apache.kafka.clients.CommonClientConfigs;
32 import org.apache.kafka.common.config.SaslConfigs;
33 import org.apache.kafka.common.config.SslConfigs;
34 import org.apache.kafka.common.security.auth.SecurityProtocol;
35 import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.springframework.beans.factory.annotation.Value;
39 import org.springframework.boot.context.properties.EnableConfigurationProperties;
41 @EnableConfigurationProperties
43 public class ApplicationConfig {
45 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
46 private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
47 private static final String JOB_DEFINITION_PATH = "./config/jobDefinition.json";
49 @Value("${server.ssl.key-store-type}")
50 private String sslKeyStoreType = "";
52 @Value("${server.ssl.key-store-password}")
53 private String sslKeyStorePassword = "";
55 @Value("${server.ssl.key-store}")
56 private String sslKeyStore = "";
58 @Value("${server.ssl.key-password}")
59 private String sslKeyPassword = "";
61 @Value("${app.webclient.trust-store-used}")
62 private boolean sslTrustStoreUsed = false;
64 @Value("${app.webclient.trust-store-password}")
65 private String sslTrustStorePassword = "";
67 @Value("${app.webclient.trust-store}")
68 private String sslTrustStore = "";
70 @Value("${app.webclient.http.proxy-host}")
71 private String httpProxyHost = "";
73 @Value("${app.webclient.http.proxy-port}")
74 private int httpProxyPort = 0;
77 @Value("${server.port}")
78 private int localServerHttpsPort;
81 @Value("${app.kafka.max-poll-records:300}")
82 private int kafkaMaxPollRecords;
85 @Value("${app.kafka.group-id}")
86 private String kafkaGroupId;
89 @Value("${app.kafka.client-id}")
90 private String kafkaClientId;
92 @Value("${app.kafka.bootstrap-servers}")
93 private String kafkaBootstrapServers;
95 @Value("${app.kafka.input-topic}")
96 private String kafkaInputTopic;
99 @Value("${app.influx.url}")
100 private String influxUrl;
103 @Value("${app.influx.access-token}")
104 private String influxAccessToken;
107 @Value("${app.ics-base-url}")
108 private String icsBaseUrl;
111 @Value("${app.consumer-job-id}")
112 private String consumerJobId;
115 @Value("${app.influx.user}")
116 private String influxUser;
119 @Value("${app.influx.password}")
120 private String influxPassword;
123 @Value("${app.influx.database}")
124 private String influxDatabase;
127 @Value("${app.influx.bucket}")
128 private String influxBucket;
131 @Value("${app.influx.org}")
132 private String influxOrg;
134 @Value("${app.kafka.ssl.key-store-type}")
135 private String kafkaKeyStoreType;
137 @Value("${app.kafka.ssl.key-store-location}")
138 private String kafkaKeyStoreLocation;
140 @Value("${app.kafka.ssl.key-store-password}")
141 private String kafkaKeyStorePassword;
143 @Value("${app.kafka.ssl.trust-store-type}")
144 private String kafkaTrustStoreType;
146 @Value("${app.kafka.ssl.trust-store-location}")
147 private String kafkTrustStoreLocation;
149 @Value("${app.kafka.use-oath-token}")
150 private boolean useOathToken;
152 private WebClientConfig webClientConfig = null;
154 public WebClientConfig getWebClientConfig() {
155 if (this.webClientConfig == null) {
156 WebClientConfig.HttpProxyConfig httpProxyConfig = WebClientConfig.HttpProxyConfig.builder() //
157 .httpProxyHost(this.httpProxyHost) //
158 .httpProxyPort(this.httpProxyPort) //
161 this.webClientConfig = WebClientConfig.builder() //
162 .keyStoreType(this.sslKeyStoreType) //
163 .keyStorePassword(this.sslKeyStorePassword) //
164 .keyStore(this.sslKeyStore) //
165 .keyPassword(this.sslKeyPassword) //
166 .isTrustStoreUsed(this.sslTrustStoreUsed) //
167 .trustStore(this.sslTrustStore) //
168 .trustStorePassword(this.sslTrustStorePassword) //
169 .httpProxyConfig(httpProxyConfig) //
172 return this.webClientConfig;
175 public void addKafkaSecurityProps(Map<String, Object> props) {
178 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
179 props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
180 props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
181 OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
182 props.put(SaslConfigs.SASL_JAAS_CONFIG,
183 "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
185 if (!kafkaKeyStoreLocation.isEmpty()) {
186 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
188 props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
189 props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
190 if (!kafkaKeyStorePassword.isEmpty()) {
191 props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
193 if (!kafkTrustStoreLocation.isEmpty()) {
194 props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
195 props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
201 public String getConsumerJobInfo() {
204 return Files.readString(java.nio.file.Path.of(JOB_DEFINITION_PATH), Charset.defaultCharset());
205 } catch (Exception e) {
206 logger.error("Could not load configuration file: {}, reason: {}", JOB_DEFINITION_PATH, e.getMessage());
211 private ConsumerJobInfo.KafkaDeliveryInfo getKafkaDeliveryInfoFromAplicationYaml() {
212 return ConsumerJobInfo.KafkaDeliveryInfo.builder() //
213 .bootStrapServers(this.kafkaBootstrapServers) //
214 .topic(kafkaInputTopic) //
218 private ConsumerJobInfo.KafkaDeliveryInfo getKafkaDeliveryInfo() {
220 ConsumerJobInfo infoFromFile = gson.fromJson(getConsumerJobInfo(), ConsumerJobInfo.class);
221 if (infoFromFile != null && infoFromFile.jobDefinition != null
222 && infoFromFile.jobDefinition.getDeliveryInfo() != null) {
223 return infoFromFile.jobDefinition.getDeliveryInfo();
226 } catch (Exception e) {
227 logger.warn("Could not parse file: {}, reason: {}, falling back to parameters in Application.yaml",
228 JOB_DEFINITION_PATH, e.getMessage());
230 return getKafkaDeliveryInfoFromAplicationYaml();
234 public String getKafkaInputTopic() {
235 return getKafkaDeliveryInfo().getTopic();
238 public String getKafkaBootStrapServers() {
239 return getKafkaDeliveryInfo().getBootStrapServers();