X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FJobDataDistributor.java;h=130cbd395fd18a6d7a403bbdc7c973c876c57a00;hb=4dd29883b619182fb43ebc4565266831d2a3b79e;hp=d9f6632f4cc6180401b1546a773591817f7b1fc9;hpb=547c200ebd35ebc81a92694fa48653d3ba6dcb27;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java index d9f6632..130cbd3 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java @@ -79,6 +79,7 @@ public class JobDataDistributor { this.consumerFaultCounter = 0; } + @SuppressWarnings("java:S1172") public void handleException(Throwable t) { ++this.consumerFaultCounter; } @@ -93,6 +94,7 @@ public class JobDataDistributor { SenderOptions senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo()); this.sender = KafkaSender.create(senderOptions); + } public void start(Flux input) { @@ -150,12 +152,15 @@ public class JobDataDistributor { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + config.addKafkaSecurityProps(props); + return SenderOptions.create(props); } private void sendLastStoredRecord() { String data = "{}"; - FilteredData output = new FilteredData(this.jobGroup.getType().getId(), null, data.getBytes()); + FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes()); sendToClient(output).subscribe(); } @@ -169,7 +174,7 @@ public class JobDataDistributor { gzip.flush(); gzip.close(); byte[] zipped = out.toByteArray(); - return new FilteredData(data.infoTypeId, data.key, zipped, true); + return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true); } catch (IOException e) { logger.error("Unexpected exception when zipping: {}", e.getMessage()); return data;