if (logger.isDebugEnabled()) {
logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+ logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
}
}
String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
assertThat(msgString).contains("pmCounterNumber0");
assertThat(msgString).doesNotContain("pmCounterNumber1");
+ assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
+ assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
+ // the file
printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);