From: PatrikBuhr Date: Thu, 6 Apr 2023 11:03:35 +0000 (+0200) Subject: Adding source name to kafka headers X-Git-Tag: 1.0.0~26 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=db5cb9a3759fe227ce8654bd0be1c85080e83e3f;p=nonrtric%2Fplt%2Franpm.git Adding source name to kafka headers The name of the PM reporting node is added to the Kafka headers. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-859 Change-Id: I05bd520b953036fe347bed91a638a47228e896b4 --- diff --git a/pmproducer/src/main/java/org/oran/pmproducer/filter/FilteredData.java b/pmproducer/src/main/java/org/oran/pmproducer/filter/FilteredData.java index a2348ca..babc9a0 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/filter/FilteredData.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/filter/FilteredData.java @@ -38,21 +38,25 @@ public class FilteredData { @Getter private final boolean isZipped; - private static final FilteredData emptyData = new FilteredData(null, null, null); + @Getter + private final String sourceName; + + private static final FilteredData emptyData = new FilteredData(null, null, null, null); public boolean isEmpty() { return (key == null || key.length == 0) && (value == null || value.length == 0); } - public FilteredData(String type, byte[] key, byte[] value) { - this(type, key, value, false); + public FilteredData(String sourceName, String typeId, byte[] key, byte[] value) { + this(sourceName, typeId, key, value, false); } - public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) { + public FilteredData(String nodeName, String typeId, byte[] key, byte[] value, boolean isZipped) { this.key = key; this.value = value; this.isZipped = isZipped; - this.infoTypeId = type; + this.infoTypeId = typeId; + this.sourceName = nodeName; } public String getValueAString() { @@ -69,6 +73,10 @@ public class FilteredData { result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null)); } result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes())); + if (this.sourceName != null && !this.sourceName.isEmpty()) { + result.add(new RecordHeader(DataFromTopic.SOURCE_NAME_PROPERTY, this.sourceName.getBytes())); + } + return result; } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/filter/PmReportFilter.java b/pmproducer/src/main/java/org/oran/pmproducer/filter/PmReportFilter.java index 26bdf58..00f90f6 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/filter/PmReportFilter.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/filter/PmReportFilter.java @@ -193,7 +193,8 @@ public class PmReportFilter { if (reportFiltered == null) { return FilteredData.empty(); } - return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes()); + return new FilteredData(reportFiltered.event.getCommonEventHeader().getSourceName(), data.infoTypeId, + data.key, gson.toJson(reportFiltered).getBytes()); } catch (Exception e) { logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage()); return FilteredData.empty(); diff --git a/pmproducer/src/main/java/org/oran/pmproducer/repository/Job.java b/pmproducer/src/main/java/org/oran/pmproducer/repository/Job.java index e3c0e8a..7f1e4a1 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/repository/Job.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/repository/Job.java @@ -153,14 +153,9 @@ public class Job { .typeId(type.getId()) // .clientId(type.getKafkaClientId(appConfig)) // .build(); - } public FilteredData filter(DataFromTopic data) { - if (filter == null) { - logger.debug("No filter used"); - return new FilteredData(data.infoTypeId, data.key, data.value); - } return filter.filter(data); } } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java index f2d7b53..b224945 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java @@ -159,7 +159,7 @@ public class JobDataDistributor { private void sendLastStoredRecord() { String data = "{}"; - FilteredData output = new FilteredData(this.jobGroup.getType().getId(), null, data.getBytes()); + FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes()); sendToClient(output).subscribe(); } @@ -173,7 +173,7 @@ public class JobDataDistributor { gzip.flush(); gzip.close(); byte[] zipped = out.toByteArray(); - return new FilteredData(data.infoTypeId, data.key, zipped, true); + return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true); } catch (IOException e) { logger.error("Unexpected exception when zipping: {}", e.getMessage()); return data; diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java index 3dd2475..c85c52e 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java @@ -83,6 +83,7 @@ public class TopicListener { public static final String ZIPPED_PROPERTY = "gzip"; public static final String TYPE_ID_PROPERTY = "type-id"; + public static final String SOURCE_NAME_PROPERTY = "source-name"; public boolean isZipped() { if (headers == null) { @@ -97,16 +98,25 @@ public class TopicListener { } public String getTypeIdFromHeaders() { + return this.getStringProperty(TYPE_ID_PROPERTY); + } + + public String getSourceNameFromHeaders() { + return this.getStringProperty(SOURCE_NAME_PROPERTY); + } + + private String getStringProperty(String propertyName) { if (headers == null) { return ""; } for (Header h : headers) { - if (h.key().equals(TYPE_ID_PROPERTY)) { + if (h.key().equals(propertyName)) { return new String(h.value()); } } return ""; } + } private static final Logger logger = LoggerFactory.getLogger(TopicListener.class); diff --git a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java index 70922d8..990e7b2 100644 --- a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java +++ b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java @@ -212,6 +212,7 @@ class IntegrationWithKafka { if (logger.isDebugEnabled()) { logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders()); + logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders()); } } @@ -456,6 +457,9 @@ class IntegrationWithKafka { String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString(); assertThat(msgString).contains("pmCounterNumber0"); assertThat(msgString).doesNotContain("pmCounterNumber1"); + assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID); + assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from + // the file printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS); logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);