Minor changes 33/9233/1
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 11 Oct 2022 08:30:39 +0000 (10:30 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 11 Oct 2022 08:30:39 +0000 (10:30 +0200)
Added a testcase for many PM subscriptions using Kafka.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I39899a46afe40374cdef50538f2409ebfac7c5c7

src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/datastore/DataStore.java
src/main/java/org/oran/dmaapadapter/datastore/FileStore.java
src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/ConsumerController.java
src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 097fd32..3e65120 100644 (file)
@@ -148,7 +148,7 @@ public class ApplicationConfig {
     }
 
     public boolean isS3Enabled() {
-        return !s3EndpointOverride.isEmpty();
+        return !(s3EndpointOverride.isBlank() || s3Bucket.isBlank());
     }
 
     // Adapter to parse the json format of the configuration file.
index 255b77a..51039b2 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.oran.dmaapadapter.datastore;
 
+import java.nio.file.Path;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -38,4 +40,10 @@ public interface DataStore {
 
     public Mono<Boolean> deleteObject(Bucket bucket, String name);
 
+    public Mono<String> copyFileTo(Path from, String to);
+
+    public Mono<String> create(DataStore.Bucket bucket);
+
+    public Mono<String> deleteBucket(Bucket bucket);
+
 }
index 430b62f..cd2d355 100644 (file)
@@ -105,6 +105,7 @@ public class FileStore implements DataStore {
         }
     }
 
+    @Override
     public Mono<String> copyFileTo(Path from, String to) {
         try {
             Path toPath = path(to);
@@ -131,17 +132,24 @@ public class FileStore implements DataStore {
         }
     }
 
+    @Override
+    public Mono<String> create(Bucket bucket) {
+        return Mono.just("OK");
+    }
+
     private Path path(String name) {
         return Path.of(applicationConfig.getPmFilesPath(), name);
     }
 
-    public void deleteFiles() {
+    @Override
+    public Mono<String> deleteBucket(Bucket bucket) {
         try {
             FileSystemUtils.deleteRecursively(Path.of(applicationConfig.getPmFilesPath()));
         } catch (IOException e) {
             logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.getPmFilesPath(),
                     e.getMessage());
         }
+        return Mono.just("OK");
     }
 
 }
index fdbd3e4..bbb84de 100644 (file)
@@ -144,11 +144,13 @@ public class S3ObjectStore implements DataStore {
                 .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
     }
 
-    public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
-        return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+    @Override
+    public Mono<String> copyFileTo(Path fromFile, String toFile) {
+        return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
     }
 
-    public Mono<String> createS3Bucket(Bucket bucket) {
+    @Override
+    public Mono<String> create(Bucket bucket) {
         return createS3Bucket(bucket(bucket));
     }
 
@@ -166,6 +168,7 @@ public class S3ObjectStore implements DataStore {
                 .onErrorResume(t -> Mono.just(s3Bucket));
     }
 
+    @Override
     public Mono<String> deleteBucket(Bucket bucket) {
         return listFiles(bucket, "") //
                 .flatMap(key -> deleteObject(bucket, key)) //
index da28e64..c10bbaf 100644 (file)
@@ -98,8 +98,8 @@ public abstract class JobDataDistributor {
 
         if (applConfig.isS3Enabled()) {
             S3ObjectStore fs = new S3ObjectStore(applConfig);
-            fs.createS3Bucket(DataStore.Bucket.FILES).subscribe();
-            fs.createS3Bucket(DataStore.Bucket.LOCKS).subscribe();
+            fs.create(DataStore.Bucket.FILES).subscribe();
+            fs.create(DataStore.Bucket.LOCKS).subscribe();
         }
     }
 
index 08c8167..2ccfb3c 100644 (file)
@@ -63,6 +63,9 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
 
         return this.sender.send(Mono.just(senderRecord)) //
+                .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJob().getId(),
+                        t.getMessage())) //
+                .onErrorResume(t -> Mono.empty()) //
                 .collectList() //
                 .map(x -> data.value);
     }
index 393cd27..b7bb255 100644 (file)
@@ -53,7 +53,10 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.DataStore.Bucket;
 import org.oran.dmaapadapter.datastore.FileStore;
+import org.oran.dmaapadapter.datastore.S3ObjectStore;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReport;
 import org.oran.dmaapadapter.filter.PmReportFilter;
@@ -252,8 +255,14 @@ class ApplicationTest {
         assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
         assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
 
-        FileStore fileStore = new FileStore(applicationConfig);
-        fileStore.deleteFiles();
+        DataStore fileStore = this.dataStore();
+        fileStore.create(DataStore.Bucket.FILES).block();
+        fileStore.create(DataStore.Bucket.LOCKS).block();
+    }
+
+    private DataStore dataStore() {
+        return this.applicationConfig.isS3Enabled() ? new S3ObjectStore(applicationConfig)
+                : new FileStore(applicationConfig);
     }
 
     @AfterEach
@@ -266,6 +275,10 @@ class ApplicationTest {
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
 
+        FileStore fileStore = new FileStore(applicationConfig);
+        fileStore.deleteBucket(Bucket.FILES);
+        fileStore.deleteBucket(Bucket.LOCKS);
+
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
index d02e48b..a62ae7d 100644 (file)
@@ -85,7 +85,7 @@ public class ConsumerController {
                     content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
     })
     public ResponseEntity<Object> postData(@RequestBody String body, @RequestHeader Map<String, String> headers) {
-        logger.info("Received by consumer: {}", body);
+        logger.debug("Received by consumer: {}", body);
         testResults.receivedBodies.add(body);
         testResults.receivedHeaders.add(headers);
         return new ResponseEntity<>(HttpStatus.OK);
index df30196..e27e95a 100644 (file)
@@ -117,7 +117,7 @@ public class IcsSimulatorController {
     public ResponseEntity<Object> putIndividualInfoJob( //
             @PathVariable("infoJobId") String jobId, //
             @RequestBody ConsumerJobInfo informationJobObject) {
-        logger.info("*** added consumer job {}", jobId);
+        logger.debug("*** added consumer job {}", jobId);
         testResults.setCreatedJob(informationJobObject);
         return new ResponseEntity<>(HttpStatus.OK);
     }
@@ -133,7 +133,7 @@ public class IcsSimulatorController {
         }
         assertThat(type).isNotNull();
         validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema);
-        logger.info("ICS Simulator PUT job: {}", body);
+        logger.debug("ICS Simulator PUT job: {}", body);
         restClient.post(url, body, MediaType.APPLICATION_JSON).block();
     }
 
@@ -158,7 +158,7 @@ public class IcsSimulatorController {
 
     public void deleteJob(String jobId, AsyncRestClient restClient) {
         String url = this.testResults.registrationInfo.jobCallbackUrl + "/" + jobId;
-        logger.info("ICS Simulator DELETE job: {}", url);
+        logger.debug("ICS Simulator DELETE job: {}", url);
         restClient.delete(url).block();
 
     }
index 4e616a8..4cb1dc2 100644 (file)
@@ -29,6 +29,7 @@ import com.google.gson.JsonParser;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
 import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.FileStore;
 import org.oran.dmaapadapter.datastore.S3ObjectStore;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReportFilter;
@@ -84,7 +86,8 @@ import reactor.kafka.sender.SenderRecord;
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
         "app.pm-files-path=./src/test/resources/", //
         "app.s3.locksBucket=ropfilelocks", //
-        "app.s3.bucket=ropfiles"}) //
+        "app.pm-files-path=/tmp/dmaapadaptor", //
+        "app.s3.bucket="}) //
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
@@ -216,9 +219,9 @@ class IntegrationWithKafka {
         kafkaReceiver.reset();
         kafkaReceiver2.reset();
 
-        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
-        fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
-        fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+        DataStore fileStore = this.dataStore();
+        fileStore.create(DataStore.Bucket.FILES).block();
+        fileStore.create(DataStore.Bucket.LOCKS).block();
 
     }
 
@@ -233,7 +236,7 @@ class IntegrationWithKafka {
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
 
-        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+        DataStore fileStore = dataStore();
         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
         fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
     }
@@ -494,7 +497,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 50000;
+        final int NO_OF_OBJECTS = 5000;
 
         Instant startTime = Instant.now();
 
@@ -504,10 +507,10 @@ class IntegrationWithKafka {
                 .filename(FILE_NAME) //
                 .build();
 
-        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+        DataStore fileStore = dataStore();
 
-        fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
-        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
+        fileStore.create(DataStore.Bucket.FILES).block();
+        fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
 
         String eventAsString = gson.toJson(event);
 
@@ -524,7 +527,78 @@ class IntegrationWithKafka {
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
 
         printStatistics();
+    }
+
+    @Test
+    void clear() {
+
+    }
+
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    @Test
+    void kafkaCharacteristics_manyPmJobs() throws Exception {
+        // Filter PM reports and sent to two jobs over Kafka
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.getMeasObjClass().add("UtranCell");
+
+        final int NO_OF_JOBS = 150;
+        ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+        for (int i = 0; i < NO_OF_JOBS; ++i) {
+            final String OUTPUT_TOPIC = "manyJobs_" + i;
+            this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC, filterData), OUTPUT_TOPIC,
+                    restClient());
+            KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, OUTPUT_TOPIC, this.securityContext);
+            receivers.add(receiver);
+        }
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
+        waitForKafkaListener();
+
+        final int NO_OF_OBJECTS = 500;
+
+        Instant startTime = Instant.now();
+
+        final String FILE_NAME = "pm_report.json.gz";
+
+        NewFileEvent event = NewFileEvent.builder() //
+                .filename(FILE_NAME) //
+                .build();
+
+        DataStore fileStore = dataStore();
+
+        fileStore.create(DataStore.Bucket.FILES).block();
+        fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
+
+        String eventAsString = gson.toJson(event);
+
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
+        sendDataToKafka(dataToSend);
+
+        while (receivers.get(0).count != NO_OF_OBJECTS) {
+            logger.info("sleeping {}", kafkaReceiver.count);
+            Thread.sleep(1000 * 1);
+        }
+
+        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+
+        for (KafkaReceiver receiver : receivers) {
+            assertThat(receiver.count).isEqualTo(NO_OF_OBJECTS);
+            // System.out.println("** " + receiver.OUTPUT_TOPIC + " " + receiver.count);
+        }
+
+        // printStatistics();
+    }
 
+    private DataStore dataStore() {
+        return this.applicationConfig.isS3Enabled() ? new S3ObjectStore(applicationConfig)
+                : new FileStore(applicationConfig);
     }
 
     @Test
@@ -532,16 +606,15 @@ class IntegrationWithKafka {
         // test
         final String JOB_ID = "testHistoricalData";
 
-        S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+        DataStore fileStore = dataStore();
 
-        fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
-        fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+        fileStore.create(DataStore.Bucket.FILES).block();
+        fileStore.create(DataStore.Bucket.LOCKS).block();
 
-        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
+        fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
                 "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
 
-        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
-                "OTHER_SOURCENAME/test.json").block();
+        fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
 
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());