X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=influxlogger%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmlog%2Fconfiguration%2FApplicationConfig.java;fp=influxlogger%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmlog%2Fconfiguration%2FApplicationConfig.java;h=fc613c54de0e235e2077eb896cd8767e42727620;hb=298969556b0f84de745a67e994a590d8b2a3de13;hp=84c50836225c65daa1380ba0ab9d778172d9abe2;hpb=ebd1c0b01fb80f1313678c777d4b1cb46800c23d;p=nonrtric%2Fplt%2Franpm.git diff --git a/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java b/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java index 84c5083..fc613c5 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java +++ b/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java @@ -23,10 +23,16 @@ package org.oran.pmlog.configuration; import java.lang.invoke.MethodHandles; import java.nio.charset.Charset; import java.nio.file.Files; +import java.util.Map; import lombok.Getter; import lombok.ToString; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -61,10 +67,10 @@ public class ApplicationConfig { @Value("${app.webclient.trust-store}") private String sslTrustStore = ""; - @Value("${app.webclient.http.proxy-host:}") + @Value("${app.webclient.http.proxy-host}") private String httpProxyHost = ""; - @Value("${app.webclient.http.proxy-port:0}") + @Value("${app.webclient.http.proxy-port}") private int httpProxyPort = 0; @Getter @@ -102,7 +108,7 @@ public class ApplicationConfig { private String icsBaseUrl; @Getter - @Value("${app.consumer-job-id:shouldHaveBeenDefinedInYaml}") + @Value("${app.consumer-job-id}") private String consumerJobId; @Getter @@ -125,6 +131,24 @@ public class ApplicationConfig { @Value("${app.influx.org}") private String influxOrg; + @Value("${app.kafka.ssl.key-store-type}") + private String kafkaKeyStoreType; + + @Value("${app.kafka.ssl.key-store-location}") + private String kafkaKeyStoreLocation; + + @Value("${app.kafka.ssl.key-store-password}") + private String kafkaKeyStorePassword; + + @Value("${app.kafka.ssl.trust-store-type}") + private String kafkaTrustStoreType; + + @Value("${app.kafka.ssl.trust-store-location}") + private String kafkTrustStoreLocation; + + @Value("${app.kafka.use-oath-token}") + private boolean useOathToken; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { @@ -148,6 +172,32 @@ public class ApplicationConfig { return this.webClientConfig; } + public void addKafkaSecurityProps(Map props) { + + if (useOathToken) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, + OAuthKafkaAuthenticateLoginCallbackHandler.class.getName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; "); + } + if (!kafkaKeyStoreLocation.isEmpty()) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); + // SSL + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation); + if (!kafkaKeyStorePassword.isEmpty()) { + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword); + } + if (!kafkTrustStoreLocation.isEmpty()) { + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation); + } + } + + } + public String getConsumerJobInfo() { try {