Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / JobDataDistributor.java
index d9f6632..f2d7b53 100644 (file)
@@ -93,6 +93,7 @@ public class JobDataDistributor {
 
         SenderOptions<byte[], byte[]> senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo());
         this.sender = KafkaSender.create(senderOptions);
+
     }
 
     public void start(Flux<TopicListener.DataFromTopic> input) {
@@ -150,6 +151,9 @@ public class JobDataDistributor {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        config.addKafkaSecurityProps(props);
+
         return SenderOptions.create(props);
     }