X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Ftest%2Fjava%2Forg%2Foran%2Fpmproducer%2FIntegrationWithKafka.java;h=04b501b939c38161f912c2a4c71d6000f1561f86;hb=5902c2dbd9980f771e56b2d582f57b163c9b742e;hp=70922d8ed410e94fc2088f15f432f4de4a9df71c;hpb=54c8fecebbb5e19010e56eddf3aba8e127e0abc3;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java index 70922d8..04b501b 100644 --- a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java +++ b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java @@ -86,10 +86,14 @@ import reactor.kafka.sender.SenderRecord; "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // "app.pm-files-path=./src/test/resources/", // "app.s3.locksBucket=ropfilelocks", // - "app.pm-files-path=/tmp/dmaapadaptor", // - "app.s3.bucket=dmaaptest", // + "app.pm-files-path=/tmp/pmproducer", // + "app.s3.bucket=pmproducertest", // "app.auth-token-file=src/test/resources/jwtToken.b64", // "app.kafka.use-oath-token=false"}) // +/** + * Tests interwork with Kafka and Minio + * Requires that Kafka and ICS is started. + */ class IntegrationWithKafka { final static String PM_TYPE_ID = "PmDataOverKafka"; @@ -212,6 +216,7 @@ class IntegrationWithKafka { if (logger.isDebugEnabled()) { logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders()); + logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders()); } } @@ -456,6 +461,9 @@ class IntegrationWithKafka { String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString(); assertThat(msgString).contains("pmCounterNumber0"); assertThat(msgString).doesNotContain("pmCounterNumber1"); + assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID); + assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from + // the file printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS); logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); @@ -611,7 +619,7 @@ class IntegrationWithKafka { @Test void testHistoricalData() throws Exception { - // test + // test that it works to get already fetched data final String JOB_ID = "testHistoricalData"; DataStore fileStore = dataStore();