Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / configuration / AppConfig.java
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
  * ============LICENSE_END========================================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.configuration;
+package org.oran.datafile.configuration;
 
-import java.util.Properties;
+import java.util.Map;
 
 import lombok.Getter;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.oran.datafile.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.stereotype.Component;
 
 /**
  * Holds all configuration for the DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on
- *         3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 
 @Component
 @EnableConfigurationProperties
 public class AppConfig {
 
-    @Value("#{systemEnvironment}")
-    Properties systemEnvironment;
-
     @Value("${app.kafka.bootstrap-servers:}")
     private String kafkaBootStrapServers;
 
     @Value("${app.kafka.collected-file-topic:}")
-    public String collectedFileTopic;
+    @Getter
+    private String collectedFileTopic;
 
     @Value("${app.kafka.file-ready-event-topic:}")
-    public String fileReadyEventTopic;
+    @Getter
+    private String inputTopic;
 
     @Value("${app.kafka.client-id:undefined}")
-    public String kafkaClientId;
+    @Getter
+    private String kafkaClientId;
 
-    @Value("${app.collected-files-path:}")
-    public String collectedFilesPath;
+    @Value("${app.collected-files-path}")
+    @Getter
+    private String collectedFilesPath;
 
     @Value("${app.sftp.strict-host-key-checking:false}")
-    public boolean strictHostKeyChecking;
+    private boolean strictHostKeyChecking;
 
     @Value("${app.sftp.known-hosts-file-path:}")
-    public String knownHostsFilePath;
+    @Getter
+    private String knownHostsFilePath;
 
     @Value("${app.ssl.key-store-password-file}")
     private String clientKeyStorePassword = "";
@@ -96,6 +99,24 @@ public class AppConfig {
     @Getter
     private int noOfWorkerThreads;
 
+    @Value("${app.kafka.ssl.key-store-location}")
+    private String kafkaKeyStoreLocation;
+
+    @Value("${app.kafka.ssl.key-store-type}")
+    private String kafkaKeyStoreType;
+
+    @Value("${app.kafka.ssl.key-store-password}")
+    private String kafkaKeyStorePassword;
+
+    @Value("${app.kafka.ssl.trust-store-type}")
+    private String kafkaTrustStoreType;
+
+    @Value("${app.kafka.ssl.trust-store-location}")
+    private String kafkTrustStoreLocation;
+
+    @Value("${app.kafka.use-oath-token}")
+    private boolean useOathToken;
+
     public String getS3LocksBucket() {
         return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
     }
@@ -124,4 +145,29 @@ public class AppConfig {
             .build();
     }
 
+    public void addKafkaSecurityProps(Map<String, Object> props) {
+
+        if (useOathToken) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+            props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+                OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG,
+                "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+        }
+        if (!kafkaKeyStoreLocation.isEmpty()) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+            // SSL
+            props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+            if (!kafkaKeyStorePassword.isEmpty()) {
+                props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+            }
+            if (!kafkTrustStoreLocation.isEmpty()) {
+                props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+            }
+        }
+    }
+
 }