/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.configuration;
+package org.oran.datafile.configuration;
-import java.util.Properties;
+import java.util.Map;
import lombok.Getter;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.oran.datafile.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Holds all configuration for the DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on
- * 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Component
@EnableConfigurationProperties
public class AppConfig {
- @Value("#{systemEnvironment}")
- Properties systemEnvironment;
-
@Value("${app.kafka.bootstrap-servers:}")
private String kafkaBootStrapServers;
@Value("${app.kafka.collected-file-topic:}")
- public String collectedFileTopic;
+ @Getter
+ private String collectedFileTopic;
@Value("${app.kafka.file-ready-event-topic:}")
- public String fileReadyEventTopic;
+ @Getter
+ private String inputTopic;
@Value("${app.kafka.client-id:undefined}")
- public String kafkaClientId;
+ @Getter
+ private String kafkaClientId;
- @Value("${app.collected-files-path:}")
- public String collectedFilesPath;
+ @Value("${app.collected-files-path}")
+ @Getter
+ private String collectedFilesPath;
@Value("${app.sftp.strict-host-key-checking:false}")
- public boolean strictHostKeyChecking;
+ private boolean strictHostKeyChecking;
@Value("${app.sftp.known-hosts-file-path:}")
- public String knownHostsFilePath;
+ @Getter
+ private String knownHostsFilePath;
@Value("${app.ssl.key-store-password-file}")
private String clientKeyStorePassword = "";
@Getter
private int noOfWorkerThreads;
+ @Value("${app.kafka.ssl.key-store-location}")
+ private String kafkaKeyStoreLocation;
+
+ @Value("${app.kafka.ssl.key-store-type}")
+ private String kafkaKeyStoreType;
+
+ @Value("${app.kafka.ssl.key-store-password}")
+ private String kafkaKeyStorePassword;
+
+ @Value("${app.kafka.ssl.trust-store-type}")
+ private String kafkaTrustStoreType;
+
+ @Value("${app.kafka.ssl.trust-store-location}")
+ private String kafkTrustStoreLocation;
+
+ @Value("${app.kafka.use-oath-token}")
+ private boolean useOathToken;
+
public String getS3LocksBucket() {
return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
}
.build();
}
+ public void addKafkaSecurityProps(Map<String, Object> props) {
+
+ if (useOathToken) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+ }
+ if (!kafkaKeyStoreLocation.isEmpty()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+ // SSL
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+ if (!kafkaKeyStorePassword.isEmpty()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+ }
+ if (!kafkTrustStoreLocation.isEmpty()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+ }
+ }
+ }
+
}