Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / influxlogger / src / main / java / org / oran / pmlog / configuration / ApplicationConfig.java
index 84c5083..fc613c5 100644 (file)
@@ -23,10 +23,16 @@ package org.oran.pmlog.configuration;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.util.Map;
 
 import lombok.Getter;
 import lombok.ToString;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -61,10 +67,10 @@ public class ApplicationConfig {
     @Value("${app.webclient.trust-store}")
     private String sslTrustStore = "";
 
-    @Value("${app.webclient.http.proxy-host:}")
+    @Value("${app.webclient.http.proxy-host}")
     private String httpProxyHost = "";
 
-    @Value("${app.webclient.http.proxy-port:0}")
+    @Value("${app.webclient.http.proxy-port}")
     private int httpProxyPort = 0;
 
     @Getter
@@ -102,7 +108,7 @@ public class ApplicationConfig {
     private String icsBaseUrl;
 
     @Getter
-    @Value("${app.consumer-job-id:shouldHaveBeenDefinedInYaml}")
+    @Value("${app.consumer-job-id}")
     private String consumerJobId;
 
     @Getter
@@ -125,6 +131,24 @@ public class ApplicationConfig {
     @Value("${app.influx.org}")
     private String influxOrg;
 
+    @Value("${app.kafka.ssl.key-store-type}")
+    private String kafkaKeyStoreType;
+
+    @Value("${app.kafka.ssl.key-store-location}")
+    private String kafkaKeyStoreLocation;
+
+    @Value("${app.kafka.ssl.key-store-password}")
+    private String kafkaKeyStorePassword;
+
+    @Value("${app.kafka.ssl.trust-store-type}")
+    private String kafkaTrustStoreType;
+
+    @Value("${app.kafka.ssl.trust-store-location}")
+    private String kafkTrustStoreLocation;
+
+    @Value("${app.kafka.use-oath-token}")
+    private boolean useOathToken;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
@@ -148,6 +172,32 @@ public class ApplicationConfig {
         return this.webClientConfig;
     }
 
+    public void addKafkaSecurityProps(Map<String, Object> props) {
+
+        if (useOathToken) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+            props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+                    OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG,
+                    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+        }
+        if (!kafkaKeyStoreLocation.isEmpty()) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+            // SSL
+            props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+            if (!kafkaKeyStorePassword.isEmpty()) {
+                props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+            }
+            if (!kafkTrustStoreLocation.isEmpty()) {
+                props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+            }
+        }
+
+    }
+
     public String getConsumerJobInfo() {
 
         try {