Add security configuration to kafka 34/13634/1
authoraravind.est <aravindhan.a@est.tech>
Thu, 17 Oct 2024 17:04:43 +0000 (18:04 +0100)
committeraravind.est <aravindhan.a@est.tech>
Thu, 17 Oct 2024 17:04:43 +0000 (18:04 +0100)
Security configurations enabled of the kafka admin client and consumer

Issue-ID: SMO-154
Change-Id: If1117693ac6ba3f308874c82a62efb7e7a38b281
Signed-off-by: aravind.est <aravindhan.a@est.tech>
teiv/src/main/java/org/oran/smo/teiv/CoreApplication.java
teiv/src/main/java/org/oran/smo/teiv/config/KafkaSecurityConfig.java [new file with mode: 0644]
teiv/src/main/java/org/oran/smo/teiv/service/kafka/KafkaFactory.java
teiv/src/main/resources/application.yaml

index 12b64b9..ff91345 100644 (file)
@@ -26,6 +26,7 @@ import io.micrometer.core.instrument.MeterRegistry;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
+import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
 import org.springframework.boot.web.client.RestTemplateBuilder;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
@@ -47,6 +48,7 @@ import org.springframework.cache.annotation.EnableCaching;
 @EnableCaching
 @EnableScheduling
 @EnableAspectJAutoProxy
+@ConfigurationPropertiesScan
 public class CoreApplication {
 
     /**
diff --git a/teiv/src/main/java/org/oran/smo/teiv/config/KafkaSecurityConfig.java b/teiv/src/main/java/org/oran/smo/teiv/config/KafkaSecurityConfig.java
new file mode 100644 (file)
index 0000000..286378f
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Ericsson
+ *  Modifications Copyright (C) 2024 OpenInfra Foundation Europe
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+package org.oran.smo.teiv.config;
+
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Data
+@Slf4j
+@ConfigurationProperties("kafka.security")
+public class KafkaSecurityConfig {
+    private boolean enabled = false;
+
+    private String protocol;
+
+    private Map<String, String> properties;
+}
index 6c49d7a..0f681f2 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.smo.teiv.config.KafkaAdminConfig;
 import org.oran.smo.teiv.config.KafkaConfig;
+import org.oran.smo.teiv.config.KafkaSecurityConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.annotation.EnableKafka;
 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -50,6 +51,7 @@ public class KafkaFactory {
 
     private final KafkaConfig kafkaConfig;
     private final KafkaAdminConfig kafkaAdminConfig;
+    private final KafkaSecurityConfig kafkaSecurityConfig;
 
     @Bean
     public KafkaAdmin kafkaAdmin() {
@@ -60,6 +62,10 @@ public class KafkaFactory {
         adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMs());
         adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMaxMs());
         adminConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaAdminConfig.getRequestTimeoutMs());
+        if (kafkaSecurityConfig.isEnabled()) {
+            adminConfig.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, kafkaSecurityConfig.getProtocol());
+            adminConfig.putAll(kafkaSecurityConfig.getProperties());
+        }
         return new KafkaAdmin(adminConfig);
     }
 
@@ -104,6 +110,10 @@ public class KafkaFactory {
         config.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMs());
         config.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMaxMs());
         config.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaAdminConfig.getRequestTimeoutMs());
+        if (kafkaSecurityConfig.isEnabled()) {
+            config.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, kafkaSecurityConfig.getProtocol());
+            config.putAll(kafkaSecurityConfig.getProperties());
+        }
         return config;
     }
 }
index 8bacf50..f75a41a 100644 (file)
@@ -115,6 +115,12 @@ kafka:
     reconnect-backoff-ms: 50
     reconnect-backoff-max-ms: 30000
     request-timeout-ms: 30000
+  security:
+    enabled: false
+    protocol: SASL_PLAINTEXT
+    properties:
+      "sasl.mechanism": SCRAM-SHA-512
+      "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"\" password=\"\";"
   availability:
     retry-attempts: 2147483647
     retry-interval-ms: 1000