From: PatrikBuhr Date: Tue, 22 Nov 2022 07:57:57 +0000 (+0100) Subject: Added performance test of shared topic X-Git-Tag: 1.2.0^0 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F65%2F9765%2F1;p=nonrtric%2Fplt%2Fdmaapadapter.git Added performance test of shared topic Improved printout in characteristics result. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Ib61f06c877e45511831343915d40c24e5247c338 --- diff --git a/api/api.json b/api/api.json index 4c95af9..3ec5a4d 100644 --- a/api/api.json +++ b/api/api.json @@ -97,25 +97,25 @@ ], "properties": { "noOfSentObjects": { - "format": "int32", + "format": "int64", "type": "integer" }, "jobId": {"type": "string"}, "outputTopic": {"type": "string"}, "noOfSentBytes": { - "format": "int32", + "format": "int64", "type": "integer" }, "clientId": {"type": "string"}, "groupId": {"type": "string"}, "noOfReceivedBytes": { - "format": "int32", + "format": "int64", "type": "integer" }, "typeId": {"type": "string"}, "inputTopic": {"type": "string"}, "noOfReceivedObjects": { - "format": "int32", + "format": "int64", "type": "integer" } } diff --git a/api/api.yaml b/api/api.yaml index a73e3e5..b2eaf15 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -530,28 +530,28 @@ components: properties: noOfSentObjects: type: integer - format: int32 + format: int64 jobId: type: string outputTopic: type: string noOfSentBytes: type: integer - format: int32 + format: int64 clientId: type: string groupId: type: string noOfReceivedBytes: type: integer - format: int32 + format: int64 typeId: type: string inputTopic: type: string noOfReceivedObjects: type: integer - format: int32 + format: int64 description: Statistics information for one job statistics_info: type: object diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index e256232..8603507 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -71,19 +71,19 @@ public class Job { @JsonProperty(value = "noOfReceivedObjects", required = true) @Builder.Default - int noOfReceivedObjects = 0; + long noOfReceivedObjects = 0; @JsonProperty(value = "noOfReceivedBytes", required = true) @Builder.Default - int noOfReceivedBytes = 0; + long noOfReceivedBytes = 0; @JsonProperty(value = "noOfSentObjects", required = true) @Builder.Default - int noOfSentObjects = 0; + long noOfSentObjects = 0; @JsonProperty(value = "noOfSentBytes", required = true) @Builder.Default - int noOfSentBytes = 0; + long noOfSentBytes = 0; public void received(byte[] bytes) { noOfReceivedBytes += bytes.length; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 52bcb44..1a20c68 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.Map; import java.util.zip.GZIPInputStream; +import lombok.Setter; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; @@ -53,10 +55,14 @@ public class KafkaTopicListener implements TopicListener { private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); private final DataStore dataStore; + @Setter + private String kafkaGroupId; + public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) { this.applicationConfig = applConfig; this.type = type; this.dataStore = DataStore.create(applConfig); + this.kafkaGroupId = this.type.getKafkaGroupId(); } @Override @@ -80,7 +86,7 @@ public class KafkaTopicListener implements TopicListener { sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) // .filter(t -> t.value().length > 0 || t.key().length > 0) // .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) // - .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) // + .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) // .publish() // .autoConnect(1); } @@ -90,12 +96,15 @@ public class KafkaTopicListener implements TopicListener { if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } + + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId()); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index b6a0fad..7e806e2 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -37,6 +37,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import lombok.Builder; + import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -175,7 +177,8 @@ class IntegrationWithKafka { int count = 0; - public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) { + public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext, + String groupId) { this.applicationConfig = applicationConfig; this.OUTPUT_TOPIC = outputTopic; @@ -188,6 +191,9 @@ class IntegrationWithKafka { .build(); KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type); + if (groupId != null) { + topicListener.setKafkaGroupId(groupId); + } topicListener.getFlux() // .map(this::unzip) // @@ -197,7 +203,10 @@ class IntegrationWithKafka { } private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) { - assertThat(this.applicationConfig.isZipOutput()).isEqualTo(receivedKafkaOutput.isZipped); + if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) { + logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped, + this.applicationConfig.isZipOutput()); + } if (!receivedKafkaOutput.isZipped) { return receivedKafkaOutput; } @@ -238,8 +247,8 @@ class IntegrationWithKafka { this.applicationConfig.setZipOutput(false); if (kafkaReceiver == null) { - kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext); - kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext); + kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null); + kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null); } kafkaReceiver.reset(); kafkaReceiver2.reset(); @@ -399,25 +408,62 @@ class IntegrationWithKafka { Thread.sleep(4000); } - private void printStatistics() { + private StatisticsCollection getStatistics() { String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; String statsResp = restClient().get(targetUri).block(); StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class); - int noOfSentBytes = 0; - int noOfSentObjs = 0; + return stats; + } + + @Builder + static class CharacteristicsResult { + long noOfFilesPerSecond; + long noOfSentBytes; + long noOfSentGigaBytes; + long noOfSentObjects; + long inputFileSize; + long noOfReceivedFiles; + long noOfReceivedBytes; + long noOfSubscribers; + long sizeOfSentObj; + boolean zipOutput; + } + + private CharacteristicsResult getCharacteristicsResult(Instant startTime) { + final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli(); + StatisticsCollection stats = getStatistics(); + long noOfSentBytes = 0; + long noOfSentObjs = 0; for (Statistics s : stats.jobStatistics) { noOfSentBytes += s.getNoOfSentBytes(); noOfSentObjs += s.getNoOfSentObjects(); } - logger.error(" Stats noOfSentBytes (total): {}, noOfSentObjects (total): {}, noOfTopics: {}", noOfSentBytes, - noOfSentObjs, stats.jobStatistics.size()); + + Statistics oneJobsStats = stats.jobStatistics.iterator().next(); + + return CharacteristicsResult.builder() // + .noOfSentBytes(noOfSentBytes) // + .noOfSentObjects(noOfSentObjs) // + .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) // + .noOfSubscribers(stats.jobStatistics.size()) // + .zipOutput(this.applicationConfig.isZipOutput()) // + .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) // + .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) // + .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) // + .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) // + .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) // + .build(); } private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) { final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli(); logger.error("*** {} Duration ({} ms), objects/second: {}", str, durationMs, (noOfIterations * 1000) / durationMs); - printStatistics(); + + System.out.println("--------------"); + System.out.println(gson.toJson(getCharacteristicsResult(startTime))); + System.out.println("--------------"); + } @Test @@ -473,6 +519,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); waitForKafkaListener(); + Instant startTime = Instant.now(); String sendString = "testData " + Instant.now(); String sendKey = "key " + Instant.now(); var dataToSend = Flux.just(kafkaSenderRecord(sendString, sendKey, KAFKA_TYPE_ID)); @@ -481,7 +528,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString)); assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey); - printStatistics(); + System.out.println(gson.toJson(getCharacteristicsResult(startTime))); } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @@ -568,30 +615,34 @@ class IntegrationWithKafka { void kafkaCharacteristics_manyPmJobs() throws Exception { // Filter PM reports and sent to many jobs over Kafka + this.applicationConfig.setZipOutput(false); + // Register producer, Register types await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("pmCounterNumber0"); - filterData.getMeasTypes().add("pmCounterNumber1"); + final int NO_OF_COUNTERS = 5; + for (int i = 0; i < NO_OF_COUNTERS; ++i) { + filterData.getMeasTypes().add("pmCounterNumber" + i); + } filterData.getMeasObjClass().add("NRCellCU"); - this.applicationConfig.setZipOutput(false); - final int NO_OF_JOBS = 100; + final int NO_OF_JOBS = 150; + ArrayList receivers = new ArrayList<>(); for (int i = 0; i < NO_OF_JOBS; ++i) { final String outputTopic = "manyJobs_" + i; this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient()); - KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext); + KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null); receivers.add(receiver); } await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 10000; + final int NO_OF_OBJECTS = 1000; Instant startTime = Instant.now(); @@ -625,6 +676,76 @@ class IntegrationWithKafka { } } + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. + @Test + void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception { + // Filter PM reports and sent to many jobs over Kafka + + this.applicationConfig.setZipOutput(true); + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + final int NO_OF_COUNTERS = 0; + for (int i = 0; i < NO_OF_COUNTERS; ++i) { + filterData.getMeasTypes().add("pmCounterNumber" + i); + } + filterData.getMeasObjClass().add("NRCellCU"); + + final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers"; + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient()); + + final int NO_OF_RECEIVERS = 150; + ArrayList receivers = new ArrayList<>(); + for (int i = 0; i < NO_OF_RECEIVERS; ++i) { + KafkaReceiver receiver = + new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i); + receivers.add(receiver); + } + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + waitForKafkaListener(); + + final int NO_OF_OBJECTS = 1000; + + Instant startTime = Instant.now(); + + final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz"; + + DataStore fileStore = dataStore(); + + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); + fileStore.create(DataStore.Bucket.FILES).block(); + fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); + + String eventAsString = newFileEvent(FILE_NAME); + var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID)); + sendDataToKafka(dataToSend); + + logger.info("sleeping {}", kafkaReceiver.count); + for (KafkaReceiver receiver : receivers) { + while (receiver.count < NO_OF_OBJECTS) { + if (kafkaReceiver.count > 0) { + logger.info("sleeping {}", receiver.count); + } + + Thread.sleep(1000 * 1); + } + } + + printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS); + + for (KafkaReceiver receiver : receivers) { + if (receiver.count != NO_OF_OBJECTS) { + System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count); + } + } + + Thread.sleep(1000 * 5); + } + private String newFileEvent(String fileName) { NewFileEvent event = NewFileEvent.builder() // .filename(fileName) //