Adding source name to kafka headers 58/10858/2
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 6 Apr 2023 11:03:35 +0000 (13:03 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 6 Apr 2023 11:08:43 +0000 (13:08 +0200)
The name of the PM reporting node is added to the Kafka headers.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-859
Change-Id: I05bd520b953036fe347bed91a638a47228e896b4

pmproducer/src/main/java/org/oran/pmproducer/filter/FilteredData.java
pmproducer/src/main/java/org/oran/pmproducer/filter/PmReportFilter.java
pmproducer/src/main/java/org/oran/pmproducer/repository/Job.java
pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java
pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java
pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java

index a2348ca..babc9a0 100644 (file)
@@ -38,21 +38,25 @@ public class FilteredData {
     @Getter
     private final boolean isZipped;
 
-    private static final FilteredData emptyData = new FilteredData(null, null, null);
+    @Getter
+    private final String sourceName;
+
+    private static final FilteredData emptyData = new FilteredData(null, null, null, null);
 
     public boolean isEmpty() {
         return (key == null || key.length == 0) && (value == null || value.length == 0);
     }
 
-    public FilteredData(String type, byte[] key, byte[] value) {
-        this(type, key, value, false);
+    public FilteredData(String sourceName, String typeId, byte[] key, byte[] value) {
+        this(sourceName, typeId, key, value, false);
     }
 
-    public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
+    public FilteredData(String nodeName, String typeId, byte[] key, byte[] value, boolean isZipped) {
         this.key = key;
         this.value = value;
         this.isZipped = isZipped;
-        this.infoTypeId = type;
+        this.infoTypeId = typeId;
+        this.sourceName = nodeName;
     }
 
     public String getValueAString() {
@@ -69,6 +73,10 @@ public class FilteredData {
             result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
         }
         result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
+        if (this.sourceName != null && !this.sourceName.isEmpty()) {
+            result.add(new RecordHeader(DataFromTopic.SOURCE_NAME_PROPERTY, this.sourceName.getBytes()));
+        }
+
         return result;
     }
 
index 26bdf58..00f90f6 100644 (file)
@@ -193,7 +193,8 @@ public class PmReportFilter {
             if (reportFiltered == null) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes());
+            return new FilteredData(reportFiltered.event.getCommonEventHeader().getSourceName(), data.infoTypeId,
+                    data.key, gson.toJson(reportFiltered).getBytes());
         } catch (Exception e) {
             logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
             return FilteredData.empty();
index e3c0e8a..7f1e4a1 100644 (file)
@@ -153,14 +153,9 @@ public class Job {
                 .typeId(type.getId()) //
                 .clientId(type.getKafkaClientId(appConfig)) //
                 .build();
-
     }
 
     public FilteredData filter(DataFromTopic data) {
-        if (filter == null) {
-            logger.debug("No filter used");
-            return new FilteredData(data.infoTypeId, data.key, data.value);
-        }
         return filter.filter(data);
     }
 }
index f2d7b53..b224945 100644 (file)
@@ -159,7 +159,7 @@ public class JobDataDistributor {
 
     private void sendLastStoredRecord() {
         String data = "{}";
-        FilteredData output = new FilteredData(this.jobGroup.getType().getId(), null, data.getBytes());
+        FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes());
 
         sendToClient(output).subscribe();
     }
@@ -173,7 +173,7 @@ public class JobDataDistributor {
                 gzip.flush();
                 gzip.close();
                 byte[] zipped = out.toByteArray();
-                return new FilteredData(data.infoTypeId, data.key, zipped, true);
+                return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true);
             } catch (IOException e) {
                 logger.error("Unexpected exception when zipping: {}", e.getMessage());
                 return data;
index 3dd2475..c85c52e 100644 (file)
@@ -83,6 +83,7 @@ public class TopicListener {
 
         public static final String ZIPPED_PROPERTY = "gzip";
         public static final String TYPE_ID_PROPERTY = "type-id";
+        public static final String SOURCE_NAME_PROPERTY = "source-name";
 
         public boolean isZipped() {
             if (headers == null) {
@@ -97,16 +98,25 @@ public class TopicListener {
         }
 
         public String getTypeIdFromHeaders() {
+            return this.getStringProperty(TYPE_ID_PROPERTY);
+        }
+
+        public String getSourceNameFromHeaders() {
+            return this.getStringProperty(SOURCE_NAME_PROPERTY);
+        }
+
+        private String getStringProperty(String propertyName) {
             if (headers == null) {
                 return "";
             }
             for (Header h : headers) {
-                if (h.key().equals(TYPE_ID_PROPERTY)) {
+                if (h.key().equals(propertyName)) {
                     return new String(h.value());
                 }
             }
             return "";
         }
+
     }
 
     private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);
index 70922d8..990e7b2 100644 (file)
@@ -212,6 +212,7 @@ class IntegrationWithKafka {
             if (logger.isDebugEnabled()) {
                 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
                 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+                logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
             }
         }
 
@@ -456,6 +457,9 @@ class IntegrationWithKafka {
         String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
         assertThat(msgString).contains("pmCounterNumber0");
         assertThat(msgString).doesNotContain("pmCounterNumber1");
+        assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
+        assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
+                                                                                                         // the file
 
         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);