NONRTRIC - dmaap adapter characteristic improvement 69/8769/2
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 12 Jul 2022 11:40:36 +0000 (13:40 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 12 Jul 2022 11:46:13 +0000 (13:46 +0200)
Minor changes. Added a testcase.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ib45f40763f3124c5e8c32d66e23a7b4a1252e428

pom.xml
src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/resources/test_application_configuration.json

diff --git a/pom.xml b/pom.xml
index 1bfe947..8dc6998 100644 (file)
--- a/pom.xml
+++ b/pom.xml
         <exec.skip>true</exec.skip>
     </properties>
     <dependencies>
-        <dependency>
-            <groupId>org.springdoc</groupId>
-            <artifactId>springdoc-openapi-ui</artifactId>
-            <version>1.6.3</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <!-- TEST -->
         <!-- https://mvnrepository.com/artifact/com.github.erosb/everit-json-schema -->
+        <dependency>
+            <groupId>org.springdoc</groupId>
+            <artifactId>springdoc-openapi-ui</artifactId>
+            <version>1.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.github.erosb</groupId>
             <artifactId>everit-json-schema</artifactId>
index b6d65ad..39c7a06 100644 (file)
 
 package org.oran.dmaapadapter.repository.filters;
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import lombok.Getter;
@@ -37,10 +37,10 @@ public class PmReportFilter implements Filter {
 
     @Getter
     public static class FilterData {
-        Collection<String> sourceNames = new ArrayList<>();
-        Collection<String> measObjInstIds = new ArrayList<>();
-        Collection<String> measTypes = new ArrayList<>();
-        Collection<String> measuredEntityDns = new ArrayList<>();
+        final Collection<String> sourceNames = new HashSet<>();
+        final Collection<String> measObjInstIds = new ArrayList<>();
+        final Collection<String> measTypes = new HashSet<>();
+        final Collection<String> measuredEntityDns = new ArrayList<>();
     }
 
     private static class MeasTypesIndexed extends PmReport.MeasTypes {
@@ -119,7 +119,7 @@ public class PmReportFilter implements Filter {
 
         PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone();
 
-        if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) {
+        if (filter.measObjInstIds.isEmpty() || isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds)) {
             newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter);
         }
         return newMeasValuesList;
index a345670..0fa7a8e 100644 (file)
@@ -26,6 +26,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.gson.JsonParser;
 
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -50,6 +53,7 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.filters.PmReportFilter;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
@@ -80,6 +84,7 @@ import reactor.kafka.sender.SenderRecord;
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
+    final String PM_TYPE_ID = "PmInformationTypeKafka";
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -152,11 +157,14 @@ class IntegrationWithKafka {
     }
 
     private static class KafkaReceiver {
-        public final static String OUTPUT_TOPIC = "outputTopic";
-        private TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+        public final String OUTPUT_TOPIC;
+        private TopicListener.Output receivedKafkaOutput;
         private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
 
-        public KafkaReceiver(ApplicationConfig applicationConfig) {
+        int count = 0;
+
+        public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) {
+            this.OUTPUT_TOPIC = outputTopic;
 
             // Create a listener to the output topic. The KafkaTopicListener happens to be
             // suitable for that,
@@ -171,6 +179,7 @@ class IntegrationWithKafka {
 
         private void set(TopicListener.Output receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
+            this.count++;
             logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
         }
 
@@ -181,15 +190,24 @@ class IntegrationWithKafka {
         synchronized String lastValue() {
             return this.receivedKafkaOutput.value;
         }
+
+        void reset() {
+            count = 0;
+            this.receivedKafkaOutput = new TopicListener.Output("", "");
+        }
     }
 
     private static KafkaReceiver kafkaReceiver;
+    private static KafkaReceiver kafkaReceiver2;
 
     @BeforeEach
     void init() {
         if (kafkaReceiver == null) {
-            kafkaReceiver = new KafkaReceiver(this.applicationConfig);
+            kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic");
+            kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2");
         }
+        kafkaReceiver.reset();
+        kafkaReceiver2.reset();
     }
 
     @AfterEach
@@ -259,6 +277,18 @@ class IntegrationWithKafka {
         }
     }
 
+    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+        try {
+            Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+            String str = gson.toJson(param);
+            Object parametersObj = jsonObject(str);
+
+            return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", null, "");
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     ConsumerJobInfo consumerJobInfoKafka(String topic) {
         try {
             Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
@@ -283,12 +313,8 @@ class IntegrationWithKafka {
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> senderRecord(String data) {
-        return senderRecord(data, "");
-    }
-
-    private SenderRecord<String, String, Integer> senderRecord(String data, String key) {
-        final InfoType infoType = this.types.get(TYPE_ID);
+    private SenderRecord<String, String, Integer> senderRecord(String data, String key, String typeId) {
+        final InfoType infoType = this.types.get(typeId);
         int correlationMetadata = 2;
         return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
     }
@@ -338,7 +364,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
         waitForKafkaListener();
 
-        var dataToSend = Flux.just(senderRecord("Message", ""));
+        var dataToSend = Flux.just(senderRecord("Message", "", TYPE_ID));
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message");
@@ -360,7 +386,9 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
-        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+                                                                                               // Message_2
+                                                                                               // etc.
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
@@ -374,13 +402,13 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
         waitForKafkaListener();
 
         String sendString = "testData " + Instant.now();
         String sendKey = "key " + Instant.now();
-        var dataToSend = Flux.just(senderRecord(sendString, sendKey));
+        var dataToSend = Flux.just(senderRecord(sendString, sendKey, TYPE_ID));
         sendDataToStream(dataToSend);
 
         await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
@@ -396,7 +424,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
         waitForKafkaListener();
 
@@ -404,7 +432,8 @@ class IntegrationWithKafka {
 
         Instant startTime = Instant.now();
 
-        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i)); // Message_1, etc.
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+                                                                                                           // etc.
         sendDataToStream(dataToSend);
 
         while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) {
@@ -416,6 +445,52 @@ class IntegrationWithKafka {
         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
     }
 
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    @Test
+    void kafkaCharacteristics_pmFilter() throws Exception {
+        // Filter PM reports and senttotowjobs over Kafka
+
+        final String JOB_ID = "kafkaCharacteristics";
+        final String JOB_ID2 = "kafkaCharacteristics2";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.getMeasObjInstIds().add("UtranCell");
+
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
+                restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
+                restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+        waitForKafkaListener();
+
+        final int NO_OF_OBJECTS = 100000;
+
+        Instant startTime = Instant.now();
+
+        String path = "./src/test/resources/pm_report.json";
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(pmReportJson, "", PM_TYPE_ID));
+        sendDataToStream(dataToSend);
+
+        while (kafkaReceiver.count != NO_OF_OBJECTS) {
+            logger.info("sleeping {}", kafkaReceiver.count);
+            Thread.sleep(1000 * 1);
+        }
+
+        // System.out.println(kafkaReceiver.receivedKafkaOutput.value);
+
+        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+        logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
+    }
+
     @Test
     void kafkaDeleteJobShouldNotStopListener() throws Exception {
         final String JOB_ID1 = "ID1";
@@ -432,7 +507,9 @@ class IntegrationWithKafka {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
-        var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+        var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+                                                                                                 // Message_2
+                                                                                                 // etc.
         sendDataToStream(dataToSend); // this should not overflow
 
         // Delete jobs, recreate one
@@ -442,7 +519,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        dataToSend = Flux.just(senderRecord("Howdy"));
+        dataToSend = Flux.just(senderRecord("Howdy", "", TYPE_ID));
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumerLast("Howdy");
index df1ccd5..f64ab12 100644 (file)
@@ -8,7 +8,7 @@
       },
       {
          "id": "KafkaInformationType",
-         "kafkaInputTopic": "TutorialTopic",
+         "kafkaInputTopic": "KafkaInput",
          "useHttpProxy": false
       },
       {
       },
       {
          "id": "PmInformationTypeKafka",
-         "kafkaInputTopic": "TutorialTopic",
+         "kafkaInputTopic": "KafkaPmInput",
          "useHttpProxy": false,
          "dataType": "PmData",
          "isJson": true
       }
    ]
-}
\ No newline at end of file
+}