Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / TopicListener.java
index 351fcc6..3dd2475 100644 (file)
@@ -155,21 +155,22 @@ public class TopicListener {
     }
 
     private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
-        Map<String, Object> consumerProps = new HashMap<>();
+        Map<String, Object> props = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
 
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
 
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
+        this.applicationConfig.addKafkaSecurityProps(props);
 
-        return ReceiverOptions.<byte[], byte[]>create(consumerProps)
+        return ReceiverOptions.<byte[], byte[]>create(props)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
     }