From: PatrikBuhr Date: Tue, 9 Aug 2022 11:21:26 +0000 (+0200) Subject: NONRTRIC - optimization of PM filtering X-Git-Tag: 1.2.0~22 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F46%2F8846%2F2;p=nonrtric%2Fplt%2Fdmaapadapter.git NONRTRIC - optimization of PM filtering If there are many PM job, the PM parsing is done once instead of onced per job. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I64dc8332f3efd1bd3a284f6896be7bd9a0dd9bf7 --- diff --git a/pom.xml b/pom.xml index 8dc6998..91bbd41 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.springframework.boot spring-boot-starter-parent - 2.5.8 + 2.6.6 org.o-ran-sc.nonrtric.plt @@ -373,4 +373,4 @@ JIRA https://jira.o-ran-sc.org/ - \ No newline at end of file + diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index 2e06069..afb424c 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -20,12 +20,31 @@ package org.oran.dmaapadapter.filter; +import lombok.ToString; + +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; + public interface Filter { public enum Type { REGEXP, JSLT, JSON_PATH, PM_DATA, NONE } - public String filter(String data); + @ToString + public static class FilteredData { + public final String key; + public final String value; + + public FilteredData(String key, String value) { + this.key = key; + this.value = value; + } + + public static FilteredData empty() { + return new FilteredData("", ""); + } + } + + public FilteredData filter(DataFromTopic data); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java index df7eb44..9336617 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.NullNode; import com.schibsted.spt.data.jslt.Expression; import com.schibsted.spt.data.jslt.Parser; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,22 +47,22 @@ class JsltFilter implements Filter { } @Override - public String filter(String jsonString) { + public FilteredData filter(DataFromTopic data) { if (expression == null) { - return jsonString; + return new FilteredData(data.key, data.value); } try { JsonFactory factory = mapper.getFactory(); - JsonParser parser = factory.createParser(jsonString); + JsonParser parser = factory.createParser(data.value); JsonNode actualObj = mapper.readTree(parser); JsonNode filteredNode = expression.apply(actualObj); if (filteredNode == NullNode.instance) { - return ""; + return FilteredData.empty(); } - return mapper.writeValueAsString(filteredNode); + return new FilteredData(data.key, mapper.writeValueAsString(filteredNode)); } catch (Exception e) { - return ""; + return FilteredData.empty(); } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java index c931e27..36a2103 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java @@ -22,6 +22,7 @@ package org.oran.dmaapadapter.filter; import com.jayway.jsonpath.JsonPath; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +41,12 @@ class JsonPathFilter implements Filter { } @Override - public String filter(String jsonString) { + public FilteredData filter(DataFromTopic data) { try { - Object o = JsonPath.parse(jsonString).read(this.expression, Object.class); - return o == null ? "" : gson.toJson(o); + Object o = JsonPath.parse(data.value).read(this.expression, Object.class); + return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o)); } catch (Exception e) { - return ""; + return FilteredData.empty(); } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index bac1369..3740eef 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -29,6 +29,7 @@ import java.util.Map; import lombok.Getter; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.thymeleaf.util.StringUtils; @@ -79,21 +80,31 @@ public class PmReportFilter implements Filter { } @Override - public String filter(String data) { + public FilteredData filter(DataFromTopic data) { try { - PmReport report = gsonParse.fromJson(data, PmReport.class); + PmReport report = createPmReport(data); if (report.event.perf3gppFields == null) { logger.warn("Received PM report with no perf3gppFields, ignored. {}", data); - return ""; + return FilteredData.empty(); } if (!filter(report, this.filterData)) { - return ""; + return FilteredData.empty(); } - return gson.toJson(report); + return new FilteredData(data.key, gson.toJson(report)); } catch (Exception e) { logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage()); - return ""; + return FilteredData.empty(); + } + } + + @SuppressWarnings("java:S2445") // "data" is a method parameter, and should not be used for synchronization. + private PmReport createPmReport(DataFromTopic data) { + synchronized (data) { + if (data.getCachedPmReport() == null) { + data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class)); + } + return data.getCachedPmReport(); } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java index a2842ed..b1000ed 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java @@ -23,6 +23,7 @@ package org.oran.dmaapadapter.filter; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,16 +40,16 @@ class RegexpFilter implements Filter { } @Override - public String filter(String data) { + public FilteredData filter(DataFromTopic data) { if (regexp == null) { - return data; + return new FilteredData(data.key, data.value); } - Matcher matcher = regexp.matcher(data); + Matcher matcher = regexp.matcher(data.value); boolean match = matcher.find(); if (match) { - return data; + return new FilteredData(data.key, data.value); } else { - return ""; + return FilteredData.empty(); } } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 7201dad..9581eb4 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -30,6 +30,7 @@ import lombok.ToString; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.FilterFactory; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,10 +143,10 @@ public class Job { this.consumerRestClient = consumerRestClient; } - public String filter(String data) { + public Filter.FilteredData filter(DataFromTopic data) { if (filter == null) { logger.debug("No filter used"); - return data; + return new Filter.FilteredData(data.key, data.value); } return filter.filter(data); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index 69cc6ea..9ba5131 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -20,6 +20,7 @@ package org.oran.dmaapadapter.tasks; +import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ public class HttpJobDataDistributor extends JobDataDistributor { } @Override - protected Mono sendToClient(DataToConsumer output) { + protected Mono sendToClient(Filter.FilteredData output) { Job job = this.getJob(); logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 60e271e..54b8831 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -21,8 +21,8 @@ package org.oran.dmaapadapter.tasks; import lombok.Getter; -import lombok.ToString; +import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,17 +44,6 @@ public abstract class JobDataDistributor { private Disposable subscription; private final ErrorStats errorStats = new ErrorStats(); - @ToString - public static class DataToConsumer { - public final String key; - public final String value; - - public DataToConsumer(String key, String value) { - this.key = key; - this.value = value; - } - } - private class ErrorStats { private int consumerFaultCounter = 0; private boolean irrecoverableError = false; // eg. overflow @@ -88,7 +77,7 @@ public abstract class JobDataDistributor { public synchronized void start(Flux input) { stop(); this.errorStats.resetIrrecoverableErrors(); - this.subscription = handleReceivedMessage(input, job) // + this.subscription = filterAndBuffer(input, job) // .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // @@ -101,7 +90,7 @@ public abstract class JobDataDistributor { stop(); } - protected abstract Mono sendToClient(DataToConsumer output); + protected abstract Mono sendToClient(Filter.FilteredData output); public synchronized void stop() { if (this.subscription != null) { @@ -114,18 +103,17 @@ public abstract class JobDataDistributor { return this.subscription != null; } - private Flux handleReceivedMessage(Flux inputFlux, Job job) { - Flux result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) // - .filter(t -> !t.value.isEmpty()); // + private Flux filterAndBuffer(Flux inputFlux, Job job) { + Flux filtered = inputFlux.map(job::filter); // if (job.isBuffered()) { - result = result.map(input -> quoteNonJson(input.value, job)) // + filtered = filtered.map(input -> quoteNonJson(input.value, job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(buffered -> new DataToConsumer("", buffered.toString())); + .map(buffered -> new Filter.FilteredData("", buffered.toString())); } - return result; + return filtered; } private String quoteNonJson(String str, Job job) { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 8740c94..8f77381 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor { } @Override - protected Mono sendToClient(DataToConsumer data) { + protected Mono sendToClient(Filter.FilteredData data) { Job job = this.getJob(); logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic()); @@ -93,7 +94,7 @@ public class KafkaJobDataDistributor extends JobDataDistributor { return SenderOptions.create(props); } - private SenderRecord senderRecord(DataToConsumer output, Job infoJob) { + private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { int correlationMetadata = 2; String topic = infoJob.getParameters().getKafkaOutputTopic(); return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 3c6399f..62bf942 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -67,9 +67,10 @@ public class KafkaTopicListener implements TopicListener { input.value())) // .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // + .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) // + .map(input -> new DataFromTopic(input.key(), input.value())) // .publish() // - .autoConnect() // - .map(input -> new DataFromTopic(input.key(), input.value())); // + .autoConnect(); } private ReceiverOptions kafkaInputProperties() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index cb7c3de..e26915b 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,7 +20,11 @@ package org.oran.dmaapadapter.tasks; +import lombok.Getter; +import lombok.Setter; import lombok.ToString; + +import org.oran.dmaapadapter.filter.PmReport; import reactor.core.publisher.Flux; public interface TopicListener { @@ -30,6 +34,10 @@ public interface TopicListener { public final String key; public final String value; + @Getter + @Setter + private PmReport cachedPmReport; + public DataFromTopic(String key, String value) { this.key = key; this.value = value; diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index b351208..014de25 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -414,7 +414,6 @@ class ApplicationTest { PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class); assertThat(reportsParsed).hasSize(1); - } @Test diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java index ea6943d..a18034e 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java @@ -27,16 +27,21 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsltFilterTest { + private String filterReport(JsltFilter filter) throws Exception { + return filter.filter(new DataFromTopic("", loadReport())).value; + } + @Test void testPickOneValue() throws Exception { String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" // + ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue"; JsltFilter filter = new JsltFilter(reQuote(expresssion)); - String res = filter.filter(loadReport()); + String res = filterReport(filter); assertThat(res).isEqualTo(reQuote("'813'")); } @@ -46,7 +51,7 @@ class JsltFilterTest { + "."; JsltFilter filter = new JsltFilter(reQuote(expresssion)); - String res = filter.filter(loadReport()); + String res = filterReport(filter); assertThat(res).contains("event"); } @@ -55,7 +60,7 @@ class JsltFilterTest { String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" // + "."; JsltFilter filter = new JsltFilter(reQuote(expresssion)); - String res = filter.filter(loadReport()); + String res = filterReport(filter); assertThat(res).isEmpty(); } @@ -71,7 +76,7 @@ class JsltFilterTest { "}"; // JsltFilter filter = new JsltFilter(reQuote(expresssion)); - String res = filter.filter(loadReport()); + String res = filterReport(filter); String expected = "{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}"; diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java index d5e59b4..ce00c4d 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsonPathFilterTest { @@ -34,7 +35,7 @@ class JsonPathFilterTest { void testJsonPath() throws Exception { String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); JsonPathFilter filter = new JsonPathFilter(exp); - String res = filter.filter(loadReport()); + String res = filter.filter(new DataFromTopic("", loadReport())).value; assertThat(res).isEqualTo("\"attTCHSeizures\""); } diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index 23447f5..cbdf9e7 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -27,19 +27,22 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.tasks.TopicListener; class PmReportFilterTest { + private String filterReport(PmReportFilter filter) throws Exception { + return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value; + } + @Test void testPmFilterMeasTypes() throws Exception { - String reportJson = loadReport(); - PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.measTypes.add("succImmediateAssignProcs"); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(reportJson); + String filtered = filterReport(filter); assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1") .contains("Gbg-997"); @@ -48,7 +51,7 @@ class PmReportFilterTest { filterData = new PmReportFilter.FilterData(); filterData.measTypes.add("junk"); filter = new PmReportFilter(filterData); - filtered = filter.filter(reportJson); + filtered = filterReport(filter); assertThat(filtered).isEmpty(); } @@ -57,13 +60,13 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.measObjInstIds.add("junk"); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(loadReport()); + String filtered = filterReport(filter); assertThat(filtered).isEmpty(); filterData = new PmReportFilter.FilterData(); filterData.measObjInstIds.add("UtranCell=Gbg-997"); filter = new PmReportFilter(filterData); - filtered = filter.filter(loadReport()); + filtered = filterReport(filter); assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998"); } @@ -72,13 +75,13 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.measObjClass.add("junk"); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(loadReport()); + String filtered = filterReport(filter); assertThat(filtered).isEmpty(); filterData = new PmReportFilter.FilterData(); filterData.measObjClass.add("ENodeBFunction"); filter = new PmReportFilter(filterData); - filtered = filter.filter(loadReport()); + filtered = filterReport(filter); assertThat(filtered).contains("ENodeBFunction").doesNotContain("UtranCell"); } @@ -87,13 +90,13 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.sourceNames.add("junk"); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(loadReport()); + String filtered = filterReport(filter); assertThat(filtered).isEmpty(); filterData = new PmReportFilter.FilterData(); filterData.sourceNames.add("O-DU-1122"); filter = new PmReportFilter(filterData); - filtered = filter.filter(loadReport()); + filtered = filterReport(filter); assertThat(filtered).contains("O-DU-1122"); } @@ -102,13 +105,13 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.measuredEntityDns.add("junk"); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(loadReport()); + String filtered = filterReport(filter); assertThat(filtered).isEmpty(); filterData = new PmReportFilter.FilterData(); filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1"); filter = new PmReportFilter(filterData); - filtered = filter.filter(loadReport()); + filtered = filterReport(filter); assertThat(filtered).contains("ManagedElement=RNC-Gbg-1"); } @@ -117,10 +120,10 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter("junk"); + String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value; assertThat(filtered).isEmpty(); - filtered = filter.filter(reQuote("{'msg': 'test'}")); + filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value; assertThat(filtered).isEmpty(); }