X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Ftest%2Fjava%2Forg%2Foran%2Fpmproducer%2FIntegrationWithKafka.java;h=04b501b939c38161f912c2a4c71d6000f1561f86;hb=5902c2dbd9980f771e56b2d582f57b163c9b742e;hp=54104858c6eda6acb43a0ef6d9d6c325201e47c4;hpb=547c200ebd35ebc81a92694fa48653d3ba6dcb27;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java index 5410485..04b501b 100644 --- a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java +++ b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java @@ -44,7 +44,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.oran.pmproducer.clients.AsyncRestClient; import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.configuration.WebClientConfig; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; @@ -52,6 +51,7 @@ import org.oran.pmproducer.controllers.ProducerCallbacksController; import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection; import org.oran.pmproducer.datastore.DataStore; import org.oran.pmproducer.filter.PmReportFilter; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.r1.ConsumerJobInfo; import org.oran.pmproducer.repository.InfoType; import org.oran.pmproducer.repository.InfoTypes; @@ -86,9 +86,14 @@ import reactor.kafka.sender.SenderRecord; "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // "app.pm-files-path=./src/test/resources/", // "app.s3.locksBucket=ropfilelocks", // - "app.pm-files-path=/tmp/dmaapadaptor", // - "app.s3.bucket=dmaaptest" // -}) // + "app.pm-files-path=/tmp/pmproducer", // + "app.s3.bucket=pmproducertest", // + "app.auth-token-file=src/test/resources/jwtToken.b64", // + "app.kafka.use-oath-token=false"}) // +/** + * Tests interwork with Kafka and Minio + * Requires that Kafka and ICS is started. + */ class IntegrationWithKafka { final static String PM_TYPE_ID = "PmDataOverKafka"; @@ -124,11 +129,6 @@ class IntegrationWithKafka { return thisProcessUrl(); } - @Override - public String getDmaapBaseUrl() { - return thisProcessUrl(); - } - @Override public String getSelfUrl() { return thisProcessUrl(); @@ -216,6 +216,7 @@ class IntegrationWithKafka { if (logger.isDebugEnabled()) { logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders()); + logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders()); } } @@ -326,6 +327,10 @@ class IntegrationWithKafka { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + // Security + this.applicationConfig.addKafkaSecurityProps(props); + return SenderOptions.create(props); } @@ -456,6 +461,9 @@ class IntegrationWithKafka { String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString(); assertThat(msgString).contains("pmCounterNumber0"); assertThat(msgString).doesNotContain("pmCounterNumber1"); + assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID); + assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from + // the file printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS); logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); @@ -611,7 +619,7 @@ class IntegrationWithKafka { @Test void testHistoricalData() throws Exception { - // test + // test that it works to get already fetched data final String JOB_ID = "testHistoricalData"; DataStore fileStore = dataStore();