Merge "Remove unused code"
[nonrtric/plt/ranpm.git] / pmproducer / src / test / java / org / oran / pmproducer / IntegrationWithKafka.java
index 39363f8..04b501b 100644 (file)
@@ -44,7 +44,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.oran.pmproducer.clients.AsyncRestClient;
 import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
 import org.oran.pmproducer.configuration.ApplicationConfig;
 import org.oran.pmproducer.configuration.WebClientConfig;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
@@ -52,6 +51,7 @@ import org.oran.pmproducer.controllers.ProducerCallbacksController;
 import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
 import org.oran.pmproducer.datastore.DataStore;
 import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.oran.pmproducer.r1.ConsumerJobInfo;
 import org.oran.pmproducer.repository.InfoType;
 import org.oran.pmproducer.repository.InfoTypes;
@@ -86,9 +86,14 @@ import reactor.kafka.sender.SenderRecord;
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
         "app.pm-files-path=./src/test/resources/", //
         "app.s3.locksBucket=ropfilelocks", //
-        "app.pm-files-path=/tmp/dmaapadaptor", //
-        "app.s3.bucket=dmaaptest" //
-}) //
+        "app.pm-files-path=/tmp/pmproducer", //
+        "app.s3.bucket=pmproducertest", //
+        "app.auth-token-file=src/test/resources/jwtToken.b64", //
+        "app.kafka.use-oath-token=false"}) //
+/**
+ * Tests interwork with Kafka and Minio
+ * Requires that Kafka and ICS is started.
+ */
 class IntegrationWithKafka {
 
     final static String PM_TYPE_ID = "PmDataOverKafka";
@@ -211,6 +216,7 @@ class IntegrationWithKafka {
             if (logger.isDebugEnabled()) {
                 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
                 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+                logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
             }
         }
 
@@ -321,6 +327,10 @@ class IntegrationWithKafka {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        // Security
+        this.applicationConfig.addKafkaSecurityProps(props);
+
         return SenderOptions.create(props);
     }
 
@@ -451,6 +461,9 @@ class IntegrationWithKafka {
         String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
         assertThat(msgString).contains("pmCounterNumber0");
         assertThat(msgString).doesNotContain("pmCounterNumber1");
+        assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
+        assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
+                                                                                                         // the file
 
         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
@@ -606,7 +619,7 @@ class IntegrationWithKafka {
 
     @Test
     void testHistoricalData() throws Exception {
-        // test
+        // test that it works to get already fetched data
         final String JOB_ID = "testHistoricalData";
 
         DataStore fileStore = dataStore();