NONRTRIC - optimization of PM filtering 46/8846/2
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 9 Aug 2022 11:21:26 +0000 (13:21 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 11 Aug 2022 07:28:39 +0000 (09:28 +0200)
If there are many PM job, the PM parsing is done once instead of onced per job.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I64dc8332f3efd1bd3a284f6896be7bd9a0dd9bf7

16 files changed:
pom.xml
src/main/java/org/oran/dmaapadapter/filter/Filter.java
src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java
src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java

diff --git a/pom.xml b/pom.xml
index 8dc6998..91bbd41 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.5.8</version>
+        <version>2.6.6</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric.plt</groupId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
\ No newline at end of file
+</project>
index 2e06069..afb424c 100644 (file)
 
 package org.oran.dmaapadapter.filter;
 
+import lombok.ToString;
+
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
+
 public interface Filter {
 
     public enum Type {
         REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
     }
 
-    public String filter(String data);
+    @ToString
+    public static class FilteredData {
+        public final String key;
+        public final String value;
+
+        public FilteredData(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public static FilteredData empty() {
+            return new FilteredData("", "");
+        }
+    }
+
+    public FilteredData filter(DataFromTopic data);
 
 }
index df7eb44..9336617 100644 (file)
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
 import com.schibsted.spt.data.jslt.Expression;
 import com.schibsted.spt.data.jslt.Parser;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,22 +47,22 @@ class JsltFilter implements Filter {
     }
 
     @Override
-    public String filter(String jsonString) {
+    public FilteredData filter(DataFromTopic data) {
         if (expression == null) {
-            return jsonString;
+            return new FilteredData(data.key, data.value);
         }
         try {
             JsonFactory factory = mapper.getFactory();
-            JsonParser parser = factory.createParser(jsonString);
+            JsonParser parser = factory.createParser(data.value);
             JsonNode actualObj = mapper.readTree(parser);
 
             JsonNode filteredNode = expression.apply(actualObj);
             if (filteredNode == NullNode.instance) {
-                return "";
+                return FilteredData.empty();
             }
-            return mapper.writeValueAsString(filteredNode);
+            return new FilteredData(data.key, mapper.writeValueAsString(filteredNode));
         } catch (Exception e) {
-            return "";
+            return FilteredData.empty();
         }
     }
 
index c931e27..36a2103 100644 (file)
@@ -22,6 +22,7 @@ package org.oran.dmaapadapter.filter;
 
 import com.jayway.jsonpath.JsonPath;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,12 +41,12 @@ class JsonPathFilter implements Filter {
     }
 
     @Override
-    public String filter(String jsonString) {
+    public FilteredData filter(DataFromTopic data) {
         try {
-            Object o = JsonPath.parse(jsonString).read(this.expression, Object.class);
-            return o == null ? "" : gson.toJson(o);
+            Object o = JsonPath.parse(data.value).read(this.expression, Object.class);
+            return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o));
         } catch (Exception e) {
-            return "";
+            return FilteredData.empty();
         }
 
     }
index bac1369..3740eef 100644 (file)
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import lombok.Getter;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.thymeleaf.util.StringUtils;
@@ -79,21 +80,31 @@ public class PmReportFilter implements Filter {
     }
 
     @Override
-    public String filter(String data) {
+    public FilteredData filter(DataFromTopic data) {
         try {
-            PmReport report = gsonParse.fromJson(data, PmReport.class);
+            PmReport report = createPmReport(data);
             if (report.event.perf3gppFields == null) {
                 logger.warn("Received PM report with no perf3gppFields, ignored. {}", data);
-                return "";
+                return FilteredData.empty();
             }
 
             if (!filter(report, this.filterData)) {
-                return "";
+                return FilteredData.empty();
             }
-            return gson.toJson(report);
+            return new FilteredData(data.key, gson.toJson(report));
         } catch (Exception e) {
             logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
-            return "";
+            return FilteredData.empty();
+        }
+    }
+
+    @SuppressWarnings("java:S2445") // "data" is a method parameter, and should not be used for synchronization.
+    private PmReport createPmReport(DataFromTopic data) {
+        synchronized (data) {
+            if (data.getCachedPmReport() == null) {
+                data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class));
+            }
+            return data.getCachedPmReport();
         }
     }
 
index a2842ed..b1000ed 100644 (file)
@@ -23,6 +23,7 @@ package org.oran.dmaapadapter.filter;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,16 +40,16 @@ class RegexpFilter implements Filter {
     }
 
     @Override
-    public String filter(String data) {
+    public FilteredData filter(DataFromTopic data) {
         if (regexp == null) {
-            return data;
+            return new FilteredData(data.key, data.value);
         }
-        Matcher matcher = regexp.matcher(data);
+        Matcher matcher = regexp.matcher(data.value);
         boolean match = matcher.find();
         if (match) {
-            return data;
+            return new FilteredData(data.key, data.value);
         } else {
-            return "";
+            return FilteredData.empty();
         }
     }
 
index 7201dad..9581eb4 100644 (file)
@@ -30,6 +30,7 @@ import lombok.ToString;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,10 +143,10 @@ public class Job {
         this.consumerRestClient = consumerRestClient;
     }
 
-    public String filter(String data) {
+    public Filter.FilteredData filter(DataFromTopic data) {
         if (filter == null) {
             logger.debug("No filter used");
-            return data;
+            return new Filter.FilteredData(data.key, data.value);
         }
         return filter.filter(data);
     }
index 69cc6ea..9ba5131 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oran.dmaapadapter.tasks;
 
+import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ public class HttpJobDataDistributor extends JobDataDistributor {
     }
 
     @Override
-    protected Mono<String> sendToClient(DataToConsumer output) {
+    protected Mono<String> sendToClient(Filter.FilteredData output) {
         Job job = this.getJob();
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
         MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
index 60e271e..54b8831 100644 (file)
@@ -21,8 +21,8 @@
 package org.oran.dmaapadapter.tasks;
 
 import lombok.Getter;
-import lombok.ToString;
 
+import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,17 +44,6 @@ public abstract class JobDataDistributor {
     private Disposable subscription;
     private final ErrorStats errorStats = new ErrorStats();
 
-    @ToString
-    public static class DataToConsumer {
-        public final String key;
-        public final String value;
-
-        public DataToConsumer(String key, String value) {
-            this.key = key;
-            this.value = value;
-        }
-    }
-
     private class ErrorStats {
         private int consumerFaultCounter = 0;
         private boolean irrecoverableError = false; // eg. overflow
@@ -88,7 +77,7 @@ public abstract class JobDataDistributor {
     public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         stop();
         this.errorStats.resetIrrecoverableErrors();
-        this.subscription = handleReceivedMessage(input, job) //
+        this.subscription = filterAndBuffer(input, job) //
                 .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleSentOk, //
@@ -101,7 +90,7 @@ public abstract class JobDataDistributor {
         stop();
     }
 
-    protected abstract Mono<String> sendToClient(DataToConsumer output);
+    protected abstract Mono<String> sendToClient(Filter.FilteredData output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
@@ -114,18 +103,17 @@ public abstract class JobDataDistributor {
         return this.subscription != null;
     }
 
-    private Flux<DataToConsumer> handleReceivedMessage(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
-        Flux<DataToConsumer> result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) //
-                .filter(t -> !t.value.isEmpty()); //
+    private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+        Flux<Filter.FilteredData> filtered = inputFlux.map(job::filter); //
 
         if (job.isBuffered()) {
-            result = result.map(input -> quoteNonJson(input.value, job)) //
+            filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(buffered -> new DataToConsumer("", buffered.toString()));
+                    .map(buffered -> new Filter.FilteredData("", buffered.toString()));
         }
-        return result;
+        return filtered;
     }
 
     private String quoteNonJson(String str, Job job) {
index 8740c94..8f77381 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
     }
 
     @Override
-    protected Mono<String> sendToClient(DataToConsumer data) {
+    protected Mono<String> sendToClient(Filter.FilteredData data) {
         Job job = this.getJob();
 
         logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
@@ -93,7 +94,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> senderRecord(DataToConsumer output, Job infoJob) {
+    private SenderRecord<String, String, Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
         int correlationMetadata = 2;
         String topic = infoJob.getParameters().getKafkaOutputTopic();
         return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
index 3c6399f..62bf942 100644 (file)
@@ -67,9 +67,10 @@ public class KafkaTopicListener implements TopicListener {
                         input.value())) //
                 .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+                .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+                .map(input -> new DataFromTopic(input.key(), input.value())) //
                 .publish() //
-                .autoConnect() //
-                .map(input -> new DataFromTopic(input.key(), input.value())); //
+                .autoConnect();
     }
 
     private ReceiverOptions<String, String> kafkaInputProperties() {
index cb7c3de..e26915b 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.ToString;
+
+import org.oran.dmaapadapter.filter.PmReport;
 import reactor.core.publisher.Flux;
 
 public interface TopicListener {
@@ -30,6 +34,10 @@ public interface TopicListener {
         public final String key;
         public final String value;
 
+        @Getter
+        @Setter
+        private PmReport cachedPmReport;
+
         public DataFromTopic(String key, String value) {
             this.key = key;
             this.value = value;
index b351208..014de25 100644 (file)
@@ -414,7 +414,6 @@ class ApplicationTest {
 
         PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
         assertThat(reportsParsed).hasSize(1);
-
     }
 
     @Test
index ea6943d..a18034e 100644 (file)
@@ -27,16 +27,21 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 class JsltFilterTest {
 
+    private String filterReport(JsltFilter filter) throws Exception {
+        return filter.filter(new DataFromTopic("", loadReport())).value;
+    }
+
     @Test
     void testPickOneValue() throws Exception {
         String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
                 + ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue";
 
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         assertThat(res).isEqualTo(reQuote("'813'"));
     }
 
@@ -46,7 +51,7 @@ class JsltFilterTest {
                 + ".";
 
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         assertThat(res).contains("event");
     }
 
@@ -55,7 +60,7 @@ class JsltFilterTest {
         String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" //
                 + ".";
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         assertThat(res).isEmpty();
     }
 
@@ -71,7 +76,7 @@ class JsltFilterTest {
                         "}"; //
 
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         String expected =
                 "{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}";
 
index d5e59b4..ce00c4d 100644 (file)
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 class JsonPathFilterTest {
 
@@ -34,7 +35,7 @@ class JsonPathFilterTest {
     void testJsonPath() throws Exception {
         String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
         JsonPathFilter filter = new JsonPathFilter(exp);
-        String res = filter.filter(loadReport());
+        String res = filter.filter(new DataFromTopic("", loadReport())).value;
         assertThat(res).isEqualTo("\"attTCHSeizures\"");
     }
 
index 23447f5..cbdf9e7 100644 (file)
@@ -27,19 +27,22 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener;
 
 class PmReportFilterTest {
 
+    private String filterReport(PmReportFilter filter) throws Exception {
+        return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value;
+    }
+
     @Test
     void testPmFilterMeasTypes() throws Exception {
 
-        String reportJson = loadReport();
-
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measTypes.add("succImmediateAssignProcs");
 
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(reportJson);
+        String filtered = filterReport(filter);
 
         assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1")
                 .contains("Gbg-997");
@@ -48,7 +51,7 @@ class PmReportFilterTest {
         filterData = new PmReportFilter.FilterData();
         filterData.measTypes.add("junk");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(reportJson);
+        filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
     }
 
@@ -57,13 +60,13 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measObjInstIds.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.measObjInstIds.add("UtranCell=Gbg-997");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
+        filtered = filterReport(filter);
         assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998");
     }
 
@@ -72,13 +75,13 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measObjClass.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.measObjClass.add("ENodeBFunction");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
+        filtered = filterReport(filter);
         assertThat(filtered).contains("ENodeBFunction").doesNotContain("UtranCell");
     }
 
@@ -87,13 +90,13 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.sourceNames.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.sourceNames.add("O-DU-1122");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
+        filtered = filterReport(filter);
         assertThat(filtered).contains("O-DU-1122");
     }
 
@@ -102,13 +105,13 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measuredEntityDns.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
+        filtered = filterReport(filter);
         assertThat(filtered).contains("ManagedElement=RNC-Gbg-1");
     }
 
@@ -117,10 +120,10 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         PmReportFilter filter = new PmReportFilter(filterData);
 
-        String filtered = filter.filter("junk");
+        String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value;
         assertThat(filtered).isEmpty();
 
-        filtered = filter.filter(reQuote("{'msg': 'test'}"));
+        filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value;
         assertThat(filtered).isEmpty();
 
     }