Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / configuration / ApplicationConfig.java
index 18fcd8f..a70243b 100644 (file)
@@ -26,12 +26,18 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
 import org.oran.pmproducer.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,10 +75,10 @@ public class ApplicationConfig {
     @Value("${app.webclient.trust-store}")
     private String sslTrustStore = "";
 
-    @Value("${app.webclient.http.proxy-host:}")
+    @Value("${app.webclient.http.proxy-host}")
     private String httpProxyHost = "";
 
-    @Value("${app.webclient.http.proxy-port:0}")
+    @Value("${app.webclient.http.proxy-port}")
     private int httpProxyPort = 0;
 
     @Getter
@@ -97,7 +103,7 @@ public class ApplicationConfig {
     private int kafkaMaxPollRecords;
 
     @Getter
-    @Value("${app.pm-files-path:}")
+    @Value("${app.pm-files-path}")
     private String pmFilesPath;
 
     @Getter
@@ -122,9 +128,27 @@ public class ApplicationConfig {
 
     @Getter
     @Setter
-    @Value("${app.zip-output:}")
+    @Value("${app.zip-output}")
     private boolean zipOutput;
 
+    @Value("${app.kafka.ssl.key-store-type}")
+    private String kafkaKeyStoreType;
+
+    @Value("${app.kafka.ssl.key-store-location}")
+    private String kafkaKeyStoreLocation;
+
+    @Value("${app.kafka.ssl.key-store-password}")
+    private String kafkaKeyStorePassword;
+
+    @Value("${app.kafka.ssl.trust-store-type}")
+    private String kafkaTrustStoreType;
+
+    @Value("${app.kafka.ssl.trust-store-location}")
+    private String kafkTrustStoreLocation;
+
+    @Value("${app.kafka.use-oath-token}")
+    private boolean useOathToken;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
@@ -167,7 +191,31 @@ public class ApplicationConfig {
             logger.error("Could not load configuration file {}", getLocalConfigurationFilePath());
             return Collections.emptyList();
         }
+    }
+
+    public void addKafkaSecurityProps(Map<String, Object> props) {
 
+        if (useOathToken) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+            props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+                    OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG,
+                    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+        }
+        if (!kafkaKeyStoreLocation.isEmpty()) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+            // SSL
+            props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+            if (!kafkaKeyStorePassword.isEmpty()) {
+                props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+            }
+            if (!kafkTrustStoreLocation.isEmpty()) {
+                props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+            }
+        }
     }
 
 }