X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FJobDataDistributor.java;h=b224945c08719b5c71dbabcecc98e7e6198ec653;hb=fbccee5729fb23f3424046c1d122d29f0fec545a;hp=d9f6632f4cc6180401b1546a773591817f7b1fc9;hpb=6dfbff6834c3a9da2d8f06b15eb94048cbad2d88;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java index d9f6632..b224945 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java @@ -93,6 +93,7 @@ public class JobDataDistributor { SenderOptions senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo()); this.sender = KafkaSender.create(senderOptions); + } public void start(Flux input) { @@ -150,12 +151,15 @@ public class JobDataDistributor { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + config.addKafkaSecurityProps(props); + return SenderOptions.create(props); } private void sendLastStoredRecord() { String data = "{}"; - FilteredData output = new FilteredData(this.jobGroup.getType().getId(), null, data.getBytes()); + FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes()); sendToClient(output).subscribe(); } @@ -169,7 +173,7 @@ public class JobDataDistributor { gzip.flush(); gzip.close(); byte[] zipped = out.toByteArray(); - return new FilteredData(data.infoTypeId, data.key, zipped, true); + return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true); } catch (IOException e) { logger.error("Unexpected exception when zipping: {}", e.getMessage()); return data;