Adding source name to kafka headers
[nonrtric/plt/ranpm.git] / pmproducer / src / test / java / org / oran / pmproducer / IntegrationWithKafka.java
index 70922d8..990e7b2 100644 (file)
@@ -212,6 +212,7 @@ class IntegrationWithKafka {
             if (logger.isDebugEnabled()) {
                 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
                 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+                logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
             }
         }
 
@@ -456,6 +457,9 @@ class IntegrationWithKafka {
         String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
         assertThat(msgString).contains("pmCounterNumber0");
         assertThat(msgString).doesNotContain("pmCounterNumber1");
+        assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
+        assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
+                                                                                                         // the file
 
         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);