Adding source name to kafka headers
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / filter / FilteredData.java
index a2348ca..babc9a0 100644 (file)
@@ -38,21 +38,25 @@ public class FilteredData {
     @Getter
     private final boolean isZipped;
 
-    private static final FilteredData emptyData = new FilteredData(null, null, null);
+    @Getter
+    private final String sourceName;
+
+    private static final FilteredData emptyData = new FilteredData(null, null, null, null);
 
     public boolean isEmpty() {
         return (key == null || key.length == 0) && (value == null || value.length == 0);
     }
 
-    public FilteredData(String type, byte[] key, byte[] value) {
-        this(type, key, value, false);
+    public FilteredData(String sourceName, String typeId, byte[] key, byte[] value) {
+        this(sourceName, typeId, key, value, false);
     }
 
-    public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
+    public FilteredData(String nodeName, String typeId, byte[] key, byte[] value, boolean isZipped) {
         this.key = key;
         this.value = value;
         this.isZipped = isZipped;
-        this.infoTypeId = type;
+        this.infoTypeId = typeId;
+        this.sourceName = nodeName;
     }
 
     public String getValueAString() {
@@ -69,6 +73,10 @@ public class FilteredData {
             result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
         }
         result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
+        if (this.sourceName != null && !this.sourceName.isEmpty()) {
+            result.add(new RecordHeader(DataFromTopic.SOURCE_NAME_PROPERTY, this.sourceName.getBytes()));
+        }
+
         return result;
     }