X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FJobDataDistributor.java;fp=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FJobDataDistributor.java;h=f2d7b5314d362c9d190d5b352da7dcb962ad6bdc;hb=298969556b0f84de745a67e994a590d8b2a3de13;hp=d9f6632f4cc6180401b1546a773591817f7b1fc9;hpb=ebd1c0b01fb80f1313678c777d4b1cb46800c23d;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java index d9f6632..f2d7b53 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java @@ -93,6 +93,7 @@ public class JobDataDistributor { SenderOptions senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo()); this.sender = KafkaSender.create(senderOptions); + } public void start(Flux input) { @@ -150,6 +151,9 @@ public class JobDataDistributor { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + config.addKafkaSecurityProps(props); + return SenderOptions.create(props); }