X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Fconfiguration%2FApplicationConfig.java;h=a70243bf647d7cc18c958fb8f14faa26acc0f7ac;hb=54c8fecebbb5e19010e56eddf3aba8e127e0abc3;hp=2048b4261b323406866d6489593e4b6da12782f7;hpb=6dfbff6834c3a9da2d8f06b15eb94048cbad2d88;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java b/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java index 2048b42..a70243b 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java @@ -26,12 +26,18 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.Map; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler; import org.oran.pmproducer.repository.InfoType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,10 +75,10 @@ public class ApplicationConfig { @Value("${app.webclient.trust-store}") private String sslTrustStore = ""; - @Value("${app.webclient.http.proxy-host:}") + @Value("${app.webclient.http.proxy-host}") private String httpProxyHost = ""; - @Value("${app.webclient.http.proxy-port:0}") + @Value("${app.webclient.http.proxy-port}") private int httpProxyPort = 0; @Getter @@ -85,13 +91,9 @@ public class ApplicationConfig { private String icsBaseUrl; @Getter - @Value("${app.pm_producer-base-url}") + @Value("${app.pm-producer-base-url}") private String selfUrl; - @Getter - @Value("${app.dmaap-base-url}") - private String dmaapBaseUrl; - @Getter @Value("${app.kafka.bootstrap-servers:}") private String kafkaBootStrapServers; @@ -101,7 +103,7 @@ public class ApplicationConfig { private int kafkaMaxPollRecords; @Getter - @Value("${app.pm-files-path:}") + @Value("${app.pm-files-path}") private String pmFilesPath; @Getter @@ -126,9 +128,27 @@ public class ApplicationConfig { @Getter @Setter - @Value("${app.zip-output:}") + @Value("${app.zip-output}") private boolean zipOutput; + @Value("${app.kafka.ssl.key-store-type}") + private String kafkaKeyStoreType; + + @Value("${app.kafka.ssl.key-store-location}") + private String kafkaKeyStoreLocation; + + @Value("${app.kafka.ssl.key-store-password}") + private String kafkaKeyStorePassword; + + @Value("${app.kafka.ssl.trust-store-type}") + private String kafkaTrustStoreType; + + @Value("${app.kafka.ssl.trust-store-location}") + private String kafkTrustStoreLocation; + + @Value("${app.kafka.use-oath-token}") + private boolean useOathToken; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { @@ -171,7 +191,31 @@ public class ApplicationConfig { logger.error("Could not load configuration file {}", getLocalConfigurationFilePath()); return Collections.emptyList(); } + } + public void addKafkaSecurityProps(Map props) { + + if (useOathToken) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, + OAuthKafkaAuthenticateLoginCallbackHandler.class.getName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; "); + } + if (!kafkaKeyStoreLocation.isEmpty()) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); + // SSL + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation); + if (!kafkaKeyStorePassword.isEmpty()) { + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword); + } + if (!kafkTrustStoreLocation.isEmpty()) { + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation); + } + } } }