The name of the PM reporting node is added to the Kafka headers.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-859
Change-Id: I05bd520b953036fe347bed91a638a47228e896b4
@Getter
private final boolean isZipped;
- private static final FilteredData emptyData = new FilteredData(null, null, null);
+ @Getter
+ private final String sourceName;
+
+ private static final FilteredData emptyData = new FilteredData(null, null, null, null);
public boolean isEmpty() {
return (key == null || key.length == 0) && (value == null || value.length == 0);
}
- public FilteredData(String type, byte[] key, byte[] value) {
- this(type, key, value, false);
+ public FilteredData(String sourceName, String typeId, byte[] key, byte[] value) {
+ this(sourceName, typeId, key, value, false);
}
- public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
+ public FilteredData(String nodeName, String typeId, byte[] key, byte[] value, boolean isZipped) {
this.key = key;
this.value = value;
this.isZipped = isZipped;
- this.infoTypeId = type;
+ this.infoTypeId = typeId;
+ this.sourceName = nodeName;
}
public String getValueAString() {
result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
}
result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
+ if (this.sourceName != null && !this.sourceName.isEmpty()) {
+ result.add(new RecordHeader(DataFromTopic.SOURCE_NAME_PROPERTY, this.sourceName.getBytes()));
+ }
+
return result;
}
if (reportFiltered == null) {
return FilteredData.empty();
}
- return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes());
+ return new FilteredData(reportFiltered.event.getCommonEventHeader().getSourceName(), data.infoTypeId,
+ data.key, gson.toJson(reportFiltered).getBytes());
} catch (Exception e) {
logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
return FilteredData.empty();
.typeId(type.getId()) //
.clientId(type.getKafkaClientId(appConfig)) //
.build();
-
}
public FilteredData filter(DataFromTopic data) {
- if (filter == null) {
- logger.debug("No filter used");
- return new FilteredData(data.infoTypeId, data.key, data.value);
- }
return filter.filter(data);
}
}
private void sendLastStoredRecord() {
String data = "{}";
- FilteredData output = new FilteredData(this.jobGroup.getType().getId(), null, data.getBytes());
+ FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes());
sendToClient(output).subscribe();
}
gzip.flush();
gzip.close();
byte[] zipped = out.toByteArray();
- return new FilteredData(data.infoTypeId, data.key, zipped, true);
+ return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true);
} catch (IOException e) {
logger.error("Unexpected exception when zipping: {}", e.getMessage());
return data;
public static final String ZIPPED_PROPERTY = "gzip";
public static final String TYPE_ID_PROPERTY = "type-id";
+ public static final String SOURCE_NAME_PROPERTY = "source-name";
public boolean isZipped() {
if (headers == null) {
}
public String getTypeIdFromHeaders() {
+ return this.getStringProperty(TYPE_ID_PROPERTY);
+ }
+
+ public String getSourceNameFromHeaders() {
+ return this.getStringProperty(SOURCE_NAME_PROPERTY);
+ }
+
+ private String getStringProperty(String propertyName) {
if (headers == null) {
return "";
}
for (Header h : headers) {
- if (h.key().equals(TYPE_ID_PROPERTY)) {
+ if (h.key().equals(propertyName)) {
return new String(h.value());
}
}
return "";
}
+
}
private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);
if (logger.isDebugEnabled()) {
logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+ logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
}
}
String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
assertThat(msgString).contains("pmCounterNumber0");
assertThat(msgString).doesNotContain("pmCounterNumber1");
+ assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
+ assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
+ // the file
printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);