+ public void addKafkaSecurityProps(Map<String, Object> props) {
+
+ if (useOathToken) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+ }
+ if (!kafkaKeyStoreLocation.isEmpty()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+ // SSL
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+ if (!kafkaKeyStorePassword.isEmpty()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+ }
+ if (!kafkTrustStoreLocation.isEmpty()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+ }
+ }
+
+ }
+