Documentation updates
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / JobDataDistributor.java
index d9f6632..b224945 100644 (file)
@@ -93,6 +93,7 @@ public class JobDataDistributor {
 
         SenderOptions<byte[], byte[]> senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo());
         this.sender = KafkaSender.create(senderOptions);
+
     }
 
     public void start(Flux<TopicListener.DataFromTopic> input) {
@@ -150,12 +151,15 @@ public class JobDataDistributor {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        config.addKafkaSecurityProps(props);
+
         return SenderOptions.create(props);
     }
 
     private void sendLastStoredRecord() {
         String data = "{}";
-        FilteredData output = new FilteredData(this.jobGroup.getType().getId(), null, data.getBytes());
+        FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes());
 
         sendToClient(output).subscribe();
     }
@@ -169,7 +173,7 @@ public class JobDataDistributor {
                 gzip.flush();
                 gzip.close();
                 byte[] zipped = out.toByteArray();
-                return new FilteredData(data.infoTypeId, data.key, zipped, true);
+                return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true);
             } catch (IOException e) {
                 logger.error("Unexpected exception when zipping: {}", e.getMessage());
                 return data;