import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
import org.oran.pmproducer.repository.InfoType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Value("${app.webclient.trust-store}")
private String sslTrustStore = "";
- @Value("${app.webclient.http.proxy-host:}")
+ @Value("${app.webclient.http.proxy-host}")
private String httpProxyHost = "";
- @Value("${app.webclient.http.proxy-port:0}")
+ @Value("${app.webclient.http.proxy-port}")
private int httpProxyPort = 0;
@Getter
private int kafkaMaxPollRecords;
@Getter
- @Value("${app.pm-files-path:}")
+ @Value("${app.pm-files-path}")
private String pmFilesPath;
@Getter
@Getter
@Setter
- @Value("${app.zip-output:}")
+ @Value("${app.zip-output}")
private boolean zipOutput;
+ @Value("${app.kafka.ssl.key-store-type}")
+ private String kafkaKeyStoreType;
+
+ @Value("${app.kafka.ssl.key-store-location}")
+ private String kafkaKeyStoreLocation;
+
+ @Value("${app.kafka.ssl.key-store-password}")
+ private String kafkaKeyStorePassword;
+
+ @Value("${app.kafka.ssl.trust-store-type}")
+ private String kafkaTrustStoreType;
+
+ @Value("${app.kafka.ssl.trust-store-location}")
+ private String kafkTrustStoreLocation;
+
+ @Value("${app.kafka.use-oath-token}")
+ private boolean useOathToken;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
logger.error("Could not load configuration file {}", getLocalConfigurationFilePath());
return Collections.emptyList();
}
+ }
+
+ public void addKafkaSecurityProps(Map<String, Object> props) {
+ if (useOathToken) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+ }
+ if (!kafkaKeyStoreLocation.isEmpty()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+ // SSL
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+ if (!kafkaKeyStorePassword.isEmpty()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+ }
+ if (!kafkTrustStoreLocation.isEmpty()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+ }
+ }
}
}