import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
+import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@EnableCaching
@EnableScheduling
@EnableAspectJAutoProxy
+@ConfigurationPropertiesScan
public class CoreApplication {
/**
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Ericsson
+ * Modifications Copyright (C) 2024 OpenInfra Foundation Europe
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.oran.smo.teiv.config;
+
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Data
+@Slf4j
+@ConfigurationProperties("kafka.security")
+public class KafkaSecurityConfig {
+ private boolean enabled = false;
+
+ private String protocol;
+
+ private Map<String, String> properties;
+}
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.smo.teiv.config.KafkaAdminConfig;
import org.oran.smo.teiv.config.KafkaConfig;
+import org.oran.smo.teiv.config.KafkaSecurityConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
private final KafkaConfig kafkaConfig;
private final KafkaAdminConfig kafkaAdminConfig;
+ private final KafkaSecurityConfig kafkaSecurityConfig;
@Bean
public KafkaAdmin kafkaAdmin() {
adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMs());
adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMaxMs());
adminConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaAdminConfig.getRequestTimeoutMs());
+ if (kafkaSecurityConfig.isEnabled()) {
+ adminConfig.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, kafkaSecurityConfig.getProtocol());
+ adminConfig.putAll(kafkaSecurityConfig.getProperties());
+ }
return new KafkaAdmin(adminConfig);
}
config.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMs());
config.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, kafkaAdminConfig.getReconnectBackoffMaxMs());
config.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaAdminConfig.getRequestTimeoutMs());
+ if (kafkaSecurityConfig.isEnabled()) {
+ config.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, kafkaSecurityConfig.getProtocol());
+ config.putAll(kafkaSecurityConfig.getProperties());
+ }
return config;
}
}
reconnect-backoff-ms: 50
reconnect-backoff-max-ms: 30000
request-timeout-ms: 30000
+ security:
+ enabled: false
+ protocol: SASL_PLAINTEXT
+ properties:
+ "sasl.mechanism": SCRAM-SHA-512
+ "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"\" password=\"\";"
availability:
retry-attempts: 2147483647
retry-interval-ms: 1000