Added performance test of shared topic 65/9765/1 1.2.0
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 22 Nov 2022 07:57:57 +0000 (08:57 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 23 Nov 2022 14:54:05 +0000 (15:54 +0100)
Improved printout in characteristics result.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ib61f06c877e45511831343915d40c24e5247c338

api/api.json
api/api.yaml
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 4c95af9..3ec5a4d 100644 (file)
             ],
             "properties": {
                 "noOfSentObjects": {
-                    "format": "int32",
+                    "format": "int64",
                     "type": "integer"
                 },
                 "jobId": {"type": "string"},
                 "outputTopic": {"type": "string"},
                 "noOfSentBytes": {
-                    "format": "int32",
+                    "format": "int64",
                     "type": "integer"
                 },
                 "clientId": {"type": "string"},
                 "groupId": {"type": "string"},
                 "noOfReceivedBytes": {
-                    "format": "int32",
+                    "format": "int64",
                     "type": "integer"
                 },
                 "typeId": {"type": "string"},
                 "inputTopic": {"type": "string"},
                 "noOfReceivedObjects": {
-                    "format": "int32",
+                    "format": "int64",
                     "type": "integer"
                 }
             }
index a73e3e5..b2eaf15 100644 (file)
@@ -530,28 +530,28 @@ components:
       properties:
         noOfSentObjects:
           type: integer
-          format: int32
+          format: int64
         jobId:
           type: string
         outputTopic:
           type: string
         noOfSentBytes:
           type: integer
-          format: int32
+          format: int64
         clientId:
           type: string
         groupId:
           type: string
         noOfReceivedBytes:
           type: integer
-          format: int32
+          format: int64
         typeId:
           type: string
         inputTopic:
           type: string
         noOfReceivedObjects:
           type: integer
-          format: int32
+          format: int64
       description: Statistics information for one job
     statistics_info:
       type: object
index e256232..8603507 100644 (file)
@@ -71,19 +71,19 @@ public class Job {
 
         @JsonProperty(value = "noOfReceivedObjects", required = true)
         @Builder.Default
-        int noOfReceivedObjects = 0;
+        long noOfReceivedObjects = 0;
 
         @JsonProperty(value = "noOfReceivedBytes", required = true)
         @Builder.Default
-        int noOfReceivedBytes = 0;
+        long noOfReceivedBytes = 0;
 
         @JsonProperty(value = "noOfSentObjects", required = true)
         @Builder.Default
-        int noOfSentObjects = 0;
+        long noOfSentObjects = 0;
 
         @JsonProperty(value = "noOfSentBytes", required = true)
         @Builder.Default
-        int noOfSentBytes = 0;
+        long noOfSentBytes = 0;
 
         public void received(byte[] bytes) {
             noOfReceivedBytes += bytes.length;
index 52bcb44..1a20c68 100644 (file)
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
+import lombok.Setter;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
@@ -53,10 +55,14 @@ public class KafkaTopicListener implements TopicListener {
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
     private final DataStore dataStore;
 
+    @Setter
+    private String kafkaGroupId;
+
     public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) {
         this.applicationConfig = applConfig;
         this.type = type;
         this.dataStore = DataStore.create(applConfig);
+        this.kafkaGroupId = this.type.getKafkaGroupId();
     }
 
     @Override
@@ -80,7 +86,7 @@ public class KafkaTopicListener implements TopicListener {
                         sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
                 .filter(t -> t.value().length > 0 || t.key().length > 0) //
                 .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
-                .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) //
+                .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) //
                 .publish() //
                 .autoConnect(1);
     }
@@ -90,12 +96,15 @@ public class KafkaTopicListener implements TopicListener {
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
+
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
 
         return ReceiverOptions.<byte[], byte[]>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
index b6a0fad..7e806e2 100644 (file)
@@ -37,6 +37,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import lombok.Builder;
+
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -175,7 +177,8 @@ class IntegrationWithKafka {
 
         int count = 0;
 
-        public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) {
+        public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext,
+                String groupId) {
             this.applicationConfig = applicationConfig;
             this.OUTPUT_TOPIC = outputTopic;
 
@@ -188,6 +191,9 @@ class IntegrationWithKafka {
                     .build();
 
             KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+            if (groupId != null) {
+                topicListener.setKafkaGroupId(groupId);
+            }
 
             topicListener.getFlux() //
                     .map(this::unzip) //
@@ -197,7 +203,10 @@ class IntegrationWithKafka {
         }
 
         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
-            assertThat(this.applicationConfig.isZipOutput()).isEqualTo(receivedKafkaOutput.isZipped);
+            if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) {
+                logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped,
+                        this.applicationConfig.isZipOutput());
+            }
             if (!receivedKafkaOutput.isZipped) {
                 return receivedKafkaOutput;
             }
@@ -238,8 +247,8 @@ class IntegrationWithKafka {
         this.applicationConfig.setZipOutput(false);
 
         if (kafkaReceiver == null) {
-            kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext);
-            kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext);
+            kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null);
+            kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null);
         }
         kafkaReceiver.reset();
         kafkaReceiver2.reset();
@@ -399,25 +408,62 @@ class IntegrationWithKafka {
         Thread.sleep(4000);
     }
 
-    private void printStatistics() {
+    private StatisticsCollection getStatistics() {
         String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
         String statsResp = restClient().get(targetUri).block();
         StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
-        int noOfSentBytes = 0;
-        int noOfSentObjs = 0;
+        return stats;
+    }
+
+    @Builder
+    static class CharacteristicsResult {
+        long noOfFilesPerSecond;
+        long noOfSentBytes;
+        long noOfSentGigaBytes;
+        long noOfSentObjects;
+        long inputFileSize;
+        long noOfReceivedFiles;
+        long noOfReceivedBytes;
+        long noOfSubscribers;
+        long sizeOfSentObj;
+        boolean zipOutput;
+    }
+
+    private CharacteristicsResult getCharacteristicsResult(Instant startTime) {
+        final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
+        StatisticsCollection stats = getStatistics();
+        long noOfSentBytes = 0;
+        long noOfSentObjs = 0;
         for (Statistics s : stats.jobStatistics) {
             noOfSentBytes += s.getNoOfSentBytes();
             noOfSentObjs += s.getNoOfSentObjects();
         }
-        logger.error("  Stats noOfSentBytes (total): {}, noOfSentObjects (total): {}, noOfTopics: {}", noOfSentBytes,
-                noOfSentObjs, stats.jobStatistics.size());
+
+        Statistics oneJobsStats = stats.jobStatistics.iterator().next();
+
+        return CharacteristicsResult.builder() //
+                .noOfSentBytes(noOfSentBytes) //
+                .noOfSentObjects(noOfSentObjs) //
+                .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) //
+                .noOfSubscribers(stats.jobStatistics.size()) //
+                .zipOutput(this.applicationConfig.isZipOutput()) //
+                .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) //
+                .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) //
+                .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) //
+                .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) //
+                .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) //
+                .build();
     }
 
     private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
         logger.error("*** {} Duration ({} ms),  objects/second: {}", str, durationMs,
                 (noOfIterations * 1000) / durationMs);
-        printStatistics();
+
+        System.out.println("--------------");
+        System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
+        System.out.println("--------------");
+
     }
 
     @Test
@@ -473,6 +519,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
         waitForKafkaListener();
 
+        Instant startTime = Instant.now();
         String sendString = "testData " + Instant.now();
         String sendKey = "key " + Instant.now();
         var dataToSend = Flux.just(kafkaSenderRecord(sendString, sendKey, KAFKA_TYPE_ID));
@@ -481,7 +528,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
         assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
 
-        printStatistics();
+        System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@@ -568,30 +615,34 @@ class IntegrationWithKafka {
     void kafkaCharacteristics_manyPmJobs() throws Exception {
         // Filter PM reports and sent to many jobs over Kafka
 
+        this.applicationConfig.setZipOutput(false);
+
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("pmCounterNumber0");
-        filterData.getMeasTypes().add("pmCounterNumber1");
+        final int NO_OF_COUNTERS = 5;
+        for (int i = 0; i < NO_OF_COUNTERS; ++i) {
+            filterData.getMeasTypes().add("pmCounterNumber" + i);
+        }
         filterData.getMeasObjClass().add("NRCellCU");
 
-        this.applicationConfig.setZipOutput(false);
-        final int NO_OF_JOBS = 100;
+        final int NO_OF_JOBS = 150;
+
         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
         for (int i = 0; i < NO_OF_JOBS; ++i) {
             final String outputTopic = "manyJobs_" + i;
             this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
                     restClient());
-            KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
+            KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
             receivers.add(receiver);
         }
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 10000;
+        final int NO_OF_OBJECTS = 1000;
 
         Instant startTime = Instant.now();
 
@@ -625,6 +676,76 @@ class IntegrationWithKafka {
         }
     }
 
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    @Test
+    void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception {
+        // Filter PM reports and sent to many jobs over Kafka
+
+        this.applicationConfig.setZipOutput(true);
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        final int NO_OF_COUNTERS = 0;
+        for (int i = 0; i < NO_OF_COUNTERS; ++i) {
+            filterData.getMeasTypes().add("pmCounterNumber" + i);
+        }
+        filterData.getMeasObjClass().add("NRCellCU");
+
+        final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient());
+
+        final int NO_OF_RECEIVERS = 150;
+        ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+        for (int i = 0; i < NO_OF_RECEIVERS; ++i) {
+            KafkaReceiver receiver =
+                    new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
+            receivers.add(receiver);
+        }
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        waitForKafkaListener();
+
+        final int NO_OF_OBJECTS = 1000;
+
+        Instant startTime = Instant.now();
+
+        final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
+
+        DataStore fileStore = dataStore();
+
+        fileStore.deleteBucket(DataStore.Bucket.FILES).block();
+        fileStore.create(DataStore.Bucket.FILES).block();
+        fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
+
+        String eventAsString = newFileEvent(FILE_NAME);
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
+        sendDataToKafka(dataToSend);
+
+        logger.info("sleeping {}", kafkaReceiver.count);
+        for (KafkaReceiver receiver : receivers) {
+            while (receiver.count < NO_OF_OBJECTS) {
+                if (kafkaReceiver.count > 0) {
+                    logger.info("sleeping {}", receiver.count);
+                }
+
+                Thread.sleep(1000 * 1);
+            }
+        }
+
+        printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
+
+        for (KafkaReceiver receiver : receivers) {
+            if (receiver.count != NO_OF_OBJECTS) {
+                System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
+            }
+        }
+
+        Thread.sleep(1000 * 5);
+    }
+
     private String newFileEvent(String fileName) {
         NewFileEvent event = NewFileEvent.builder() //
                 .filename(fileName) //