X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FTopicListener.java;fp=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FTopicListener.java;h=3dd2475595cc2885cec48865b392a0a0d8d28829;hb=298969556b0f84de745a67e994a590d8b2a3de13;hp=351fcc6d2163aa64d168b6dc7b943d7943758d4d;hpb=ebd1c0b01fb80f1313678c777d4b1cb46800c23d;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java index 351fcc6..3dd2475 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java @@ -155,21 +155,22 @@ public class TopicListener { } private ReceiverOptions kafkaInputProperties(String clientId) { - Map consumerProps = new HashMap<>(); + Map props = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); + this.applicationConfig.addKafkaSecurityProps(props); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(props) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); }