From: PatrikBuhr Date: Tue, 11 Oct 2022 08:30:39 +0000 (+0200) Subject: Minor changes X-Git-Tag: 1.2.0~10 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F33%2F9233%2F1;p=nonrtric%2Fplt%2Fdmaapadapter.git Minor changes Added a testcase for many PM subscriptions using Kafka. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I39899a46afe40374cdef50538f2409ebfac7c5c7 --- diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index 097fd32..3e65120 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -148,7 +148,7 @@ public class ApplicationConfig { } public boolean isS3Enabled() { - return !s3EndpointOverride.isEmpty(); + return !(s3EndpointOverride.isBlank() || s3Bucket.isBlank()); } // Adapter to parse the json format of the configuration file. diff --git a/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java index 255b77a..51039b2 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java @@ -20,6 +20,8 @@ package org.oran.dmaapadapter.datastore; +import java.nio.file.Path; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,4 +40,10 @@ public interface DataStore { public Mono deleteObject(Bucket bucket, String name); + public Mono copyFileTo(Path from, String to); + + public Mono create(DataStore.Bucket bucket); + + public Mono deleteBucket(Bucket bucket); + } diff --git a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java index 430b62f..cd2d355 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java @@ -105,6 +105,7 @@ public class FileStore implements DataStore { } } + @Override public Mono copyFileTo(Path from, String to) { try { Path toPath = path(to); @@ -131,17 +132,24 @@ public class FileStore implements DataStore { } } + @Override + public Mono create(Bucket bucket) { + return Mono.just("OK"); + } + private Path path(String name) { return Path.of(applicationConfig.getPmFilesPath(), name); } - public void deleteFiles() { + @Override + public Mono deleteBucket(Bucket bucket) { try { FileSystemUtils.deleteRecursively(Path.of(applicationConfig.getPmFilesPath())); } catch (IOException e) { logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.getPmFilesPath(), e.getMessage()); } + return Mono.just("OK"); } } diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java index fdbd3e4..bbb84de 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java @@ -144,11 +144,13 @@ public class S3ObjectStore implements DataStore { .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); } - public Mono copyFileToS3(Bucket bucket, Path fromFile, String toFile) { - return copyFileToS3Bucket(bucket(bucket), fromFile, toFile); + @Override + public Mono copyFileTo(Path fromFile, String toFile) { + return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile); } - public Mono createS3Bucket(Bucket bucket) { + @Override + public Mono create(Bucket bucket) { return createS3Bucket(bucket(bucket)); } @@ -166,6 +168,7 @@ public class S3ObjectStore implements DataStore { .onErrorResume(t -> Mono.just(s3Bucket)); } + @Override public Mono deleteBucket(Bucket bucket) { return listFiles(bucket, "") // .flatMap(key -> deleteObject(bucket, key)) // diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index da28e64..c10bbaf 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -98,8 +98,8 @@ public abstract class JobDataDistributor { if (applConfig.isS3Enabled()) { S3ObjectStore fs = new S3ObjectStore(applConfig); - fs.createS3Bucket(DataStore.Bucket.FILES).subscribe(); - fs.createS3Bucket(DataStore.Bucket.LOCKS).subscribe(); + fs.create(DataStore.Bucket.FILES).subscribe(); + fs.create(DataStore.Bucket.LOCKS).subscribe(); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 08c8167..2ccfb3c 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -63,6 +63,9 @@ public class KafkaJobDataDistributor extends JobDataDistributor { SenderRecord senderRecord = senderRecord(data, job); return this.sender.send(Mono.just(senderRecord)) // + .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJob().getId(), + t.getMessage())) // + .onErrorResume(t -> Mono.empty()) // .collectList() // .map(x -> data.value); } diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 393cd27..b7bb255 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -53,7 +53,10 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.datastore.DataStore; +import org.oran.dmaapadapter.datastore.DataStore.Bucket; import org.oran.dmaapadapter.datastore.FileStore; +import org.oran.dmaapadapter.datastore.S3ObjectStore; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReport; import org.oran.dmaapadapter.filter.PmReportFilter; @@ -252,8 +255,14 @@ class ApplicationTest { assertThat(this.consumerController.testResults.receivedBodies).isEmpty(); assertThat(this.consumerController.testResults.receivedHeaders).isEmpty(); - FileStore fileStore = new FileStore(applicationConfig); - fileStore.deleteFiles(); + DataStore fileStore = this.dataStore(); + fileStore.create(DataStore.Bucket.FILES).block(); + fileStore.create(DataStore.Bucket.LOCKS).block(); + } + + private DataStore dataStore() { + return this.applicationConfig.isS3Enabled() ? new S3ObjectStore(applicationConfig) + : new FileStore(applicationConfig); } @AfterEach @@ -266,6 +275,10 @@ class ApplicationTest { this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset(); + FileStore fileStore = new FileStore(applicationConfig); + fileStore.deleteBucket(Bucket.FILES); + fileStore.deleteBucket(Bucket.LOCKS); + } private AsyncRestClient restClient(boolean useTrustValidation) { diff --git a/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/src/test/java/org/oran/dmaapadapter/ConsumerController.java index d02e48b..a62ae7d 100644 --- a/src/test/java/org/oran/dmaapadapter/ConsumerController.java +++ b/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -85,7 +85,7 @@ public class ConsumerController { content = @Content(schema = @Schema(implementation = VoidResponse.class))) // }) public ResponseEntity postData(@RequestBody String body, @RequestHeader Map headers) { - logger.info("Received by consumer: {}", body); + logger.debug("Received by consumer: {}", body); testResults.receivedBodies.add(body); testResults.receivedHeaders.add(headers); return new ResponseEntity<>(HttpStatus.OK); diff --git a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java index df30196..e27e95a 100644 --- a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java @@ -117,7 +117,7 @@ public class IcsSimulatorController { public ResponseEntity putIndividualInfoJob( // @PathVariable("infoJobId") String jobId, // @RequestBody ConsumerJobInfo informationJobObject) { - logger.info("*** added consumer job {}", jobId); + logger.debug("*** added consumer job {}", jobId); testResults.setCreatedJob(informationJobObject); return new ResponseEntity<>(HttpStatus.OK); } @@ -133,7 +133,7 @@ public class IcsSimulatorController { } assertThat(type).isNotNull(); validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema); - logger.info("ICS Simulator PUT job: {}", body); + logger.debug("ICS Simulator PUT job: {}", body); restClient.post(url, body, MediaType.APPLICATION_JSON).block(); } @@ -158,7 +158,7 @@ public class IcsSimulatorController { public void deleteJob(String jobId, AsyncRestClient restClient) { String url = this.testResults.registrationInfo.jobCallbackUrl + "/" + jobId; - logger.info("ICS Simulator DELETE job: {}", url); + logger.debug("ICS Simulator DELETE job: {}", url); restClient.delete(url).block(); } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 4e616a8..4cb1dc2 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -29,6 +29,7 @@ import com.google.gson.JsonParser; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,6 +48,7 @@ import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; import org.oran.dmaapadapter.datastore.DataStore; +import org.oran.dmaapadapter.datastore.FileStore; import org.oran.dmaapadapter.datastore.S3ObjectStore; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReportFilter; @@ -84,7 +86,8 @@ import reactor.kafka.sender.SenderRecord; "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // "app.pm-files-path=./src/test/resources/", // "app.s3.locksBucket=ropfilelocks", // - "app.s3.bucket=ropfiles"}) // + "app.pm-files-path=/tmp/dmaapadaptor", // + "app.s3.bucket="}) // class IntegrationWithKafka { final String TYPE_ID = "KafkaInformationType"; @@ -216,9 +219,9 @@ class IntegrationWithKafka { kafkaReceiver.reset(); kafkaReceiver2.reset(); - S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); - fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); - fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block(); + DataStore fileStore = this.dataStore(); + fileStore.create(DataStore.Bucket.FILES).block(); + fileStore.create(DataStore.Bucket.LOCKS).block(); } @@ -233,7 +236,7 @@ class IntegrationWithKafka { this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset(); - S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + DataStore fileStore = dataStore(); fileStore.deleteBucket(DataStore.Bucket.FILES).block(); fileStore.deleteBucket(DataStore.Bucket.LOCKS).block(); } @@ -494,7 +497,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 50000; + final int NO_OF_OBJECTS = 5000; Instant startTime = Instant.now(); @@ -504,10 +507,10 @@ class IntegrationWithKafka { .filename(FILE_NAME) // .build(); - S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + DataStore fileStore = dataStore(); - fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); - fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); + fileStore.create(DataStore.Bucket.FILES).block(); + fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); String eventAsString = gson.toJson(event); @@ -524,7 +527,78 @@ class IntegrationWithKafka { logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); printStatistics(); + } + + @Test + void clear() { + + } + + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. + @Test + void kafkaCharacteristics_manyPmJobs() throws Exception { + // Filter PM reports and sent to two jobs over Kafka + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.getMeasObjClass().add("UtranCell"); + + final int NO_OF_JOBS = 150; + ArrayList receivers = new ArrayList<>(); + for (int i = 0; i < NO_OF_JOBS; ++i) { + final String OUTPUT_TOPIC = "manyJobs_" + i; + this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC, filterData), OUTPUT_TOPIC, + restClient()); + KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, OUTPUT_TOPIC, this.securityContext); + receivers.add(receiver); + } + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); + waitForKafkaListener(); + + final int NO_OF_OBJECTS = 500; + + Instant startTime = Instant.now(); + + final String FILE_NAME = "pm_report.json.gz"; + + NewFileEvent event = NewFileEvent.builder() // + .filename(FILE_NAME) // + .build(); + + DataStore fileStore = dataStore(); + + fileStore.create(DataStore.Bucket.FILES).block(); + fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); + + String eventAsString = gson.toJson(event); + + var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID)); + sendDataToKafka(dataToSend); + + while (receivers.get(0).count != NO_OF_OBJECTS) { + logger.info("sleeping {}", kafkaReceiver.count); + Thread.sleep(1000 * 1); + } + + final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); + logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); + + for (KafkaReceiver receiver : receivers) { + assertThat(receiver.count).isEqualTo(NO_OF_OBJECTS); + // System.out.println("** " + receiver.OUTPUT_TOPIC + " " + receiver.count); + } + + // printStatistics(); + } + private DataStore dataStore() { + return this.applicationConfig.isS3Enabled() ? new S3ObjectStore(applicationConfig) + : new FileStore(applicationConfig); } @Test @@ -532,16 +606,15 @@ class IntegrationWithKafka { // test final String JOB_ID = "testHistoricalData"; - S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); + DataStore fileStore = dataStore(); - fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); - fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block(); + fileStore.create(DataStore.Bucket.FILES).block(); + fileStore.create(DataStore.Bucket.LOCKS).block(); - fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), + fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); - fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), - "OTHER_SOURCENAME/test.json").block(); + fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block(); await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());