If there are many PM job, the PM parsing is done once instead of onced per job.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I64dc8332f3efd1bd3a284f6896be7bd9a0dd9bf7
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.8</version>
+ <version>2.6.6</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric.plt</groupId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
\ No newline at end of file
+</project>
package org.oran.dmaapadapter.filter;
+import lombok.ToString;
+
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
+
public interface Filter {
public enum Type {
REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
}
- public String filter(String data);
+ @ToString
+ public static class FilteredData {
+ public final String key;
+ public final String value;
+
+ public FilteredData(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static FilteredData empty() {
+ return new FilteredData("", "");
+ }
+ }
+
+ public FilteredData filter(DataFromTopic data);
}
import com.schibsted.spt.data.jslt.Expression;
import com.schibsted.spt.data.jslt.Parser;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public String filter(String jsonString) {
+ public FilteredData filter(DataFromTopic data) {
if (expression == null) {
- return jsonString;
+ return new FilteredData(data.key, data.value);
}
try {
JsonFactory factory = mapper.getFactory();
- JsonParser parser = factory.createParser(jsonString);
+ JsonParser parser = factory.createParser(data.value);
JsonNode actualObj = mapper.readTree(parser);
JsonNode filteredNode = expression.apply(actualObj);
if (filteredNode == NullNode.instance) {
- return "";
+ return FilteredData.empty();
}
- return mapper.writeValueAsString(filteredNode);
+ return new FilteredData(data.key, mapper.writeValueAsString(filteredNode));
} catch (Exception e) {
- return "";
+ return FilteredData.empty();
}
}
import com.jayway.jsonpath.JsonPath;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public String filter(String jsonString) {
+ public FilteredData filter(DataFromTopic data) {
try {
- Object o = JsonPath.parse(jsonString).read(this.expression, Object.class);
- return o == null ? "" : gson.toJson(o);
+ Object o = JsonPath.parse(data.value).read(this.expression, Object.class);
+ return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o));
} catch (Exception e) {
- return "";
+ return FilteredData.empty();
}
}
import lombok.Getter;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thymeleaf.util.StringUtils;
}
@Override
- public String filter(String data) {
+ public FilteredData filter(DataFromTopic data) {
try {
- PmReport report = gsonParse.fromJson(data, PmReport.class);
+ PmReport report = createPmReport(data);
if (report.event.perf3gppFields == null) {
logger.warn("Received PM report with no perf3gppFields, ignored. {}", data);
- return "";
+ return FilteredData.empty();
}
if (!filter(report, this.filterData)) {
- return "";
+ return FilteredData.empty();
}
- return gson.toJson(report);
+ return new FilteredData(data.key, gson.toJson(report));
} catch (Exception e) {
logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
- return "";
+ return FilteredData.empty();
+ }
+ }
+
+ @SuppressWarnings("java:S2445") // "data" is a method parameter, and should not be used for synchronization.
+ private PmReport createPmReport(DataFromTopic data) {
+ synchronized (data) {
+ if (data.getCachedPmReport() == null) {
+ data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class));
+ }
+ return data.getCachedPmReport();
}
}
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public String filter(String data) {
+ public FilteredData filter(DataFromTopic data) {
if (regexp == null) {
- return data;
+ return new FilteredData(data.key, data.value);
}
- Matcher matcher = regexp.matcher(data);
+ Matcher matcher = regexp.matcher(data.value);
boolean match = matcher.find();
if (match) {
- return data;
+ return new FilteredData(data.key, data.value);
} else {
- return "";
+ return FilteredData.empty();
}
}
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
this.consumerRestClient = consumerRestClient;
}
- public String filter(String data) {
+ public Filter.FilteredData filter(DataFromTopic data) {
if (filter == null) {
logger.debug("No filter used");
- return data;
+ return new Filter.FilteredData(data.key, data.value);
}
return filter.filter(data);
}
package org.oran.dmaapadapter.tasks;
+import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- protected Mono<String> sendToClient(DataToConsumer output) {
+ protected Mono<String> sendToClient(Filter.FilteredData output) {
Job job = this.getJob();
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
package org.oran.dmaapadapter.tasks;
import lombok.Getter;
-import lombok.ToString;
+import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
- @ToString
- public static class DataToConsumer {
- public final String key;
- public final String value;
-
- public DataToConsumer(String key, String value) {
- this.key = key;
- this.value = value;
- }
- }
-
private class ErrorStats {
private int consumerFaultCounter = 0;
private boolean irrecoverableError = false; // eg. overflow
public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
- this.subscription = handleReceivedMessage(input, job) //
+ this.subscription = filterAndBuffer(input, job) //
.flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
stop();
}
- protected abstract Mono<String> sendToClient(DataToConsumer output);
+ protected abstract Mono<String> sendToClient(Filter.FilteredData output);
public synchronized void stop() {
if (this.subscription != null) {
return this.subscription != null;
}
- private Flux<DataToConsumer> handleReceivedMessage(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
- Flux<DataToConsumer> result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) //
- .filter(t -> !t.value.isEmpty()); //
+ private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+ Flux<Filter.FilteredData> filtered = inputFlux.map(job::filter); //
if (job.isBuffered()) {
- result = result.map(input -> quoteNonJson(input.value, job)) //
+ filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new DataToConsumer("", buffered.toString()));
+ .map(buffered -> new Filter.FilteredData("", buffered.toString()));
}
- return result;
+ return filtered;
}
private String quoteNonJson(String str, Job job) {
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- protected Mono<String> sendToClient(DataToConsumer data) {
+ protected Mono<String> sendToClient(Filter.FilteredData data) {
Job job = this.getJob();
logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(DataToConsumer output, Job infoJob) {
+ private SenderRecord<String, String, Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
int correlationMetadata = 2;
String topic = infoJob.getParameters().getKafkaOutputTopic();
return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
input.value())) //
.doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+ .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+ .map(input -> new DataFromTopic(input.key(), input.value())) //
.publish() //
- .autoConnect() //
- .map(input -> new DataFromTopic(input.key(), input.value())); //
+ .autoConnect();
}
private ReceiverOptions<String, String> kafkaInputProperties() {
package org.oran.dmaapadapter.tasks;
+import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
+
+import org.oran.dmaapadapter.filter.PmReport;
import reactor.core.publisher.Flux;
public interface TopicListener {
public final String key;
public final String value;
+ @Getter
+ @Setter
+ private PmReport cachedPmReport;
+
public DataFromTopic(String key, String value) {
this.key = key;
this.value = value;
PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
assertThat(reportsParsed).hasSize(1);
-
}
@Test
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
class JsltFilterTest {
+ private String filterReport(JsltFilter filter) throws Exception {
+ return filter.filter(new DataFromTopic("", loadReport())).value;
+ }
+
@Test
void testPickOneValue() throws Exception {
String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
+ ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue";
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
assertThat(res).isEqualTo(reQuote("'813'"));
}
+ ".";
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
assertThat(res).contains("event");
}
String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" //
+ ".";
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
assertThat(res).isEmpty();
}
"}"; //
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
String expected =
"{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}";
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
class JsonPathFilterTest {
void testJsonPath() throws Exception {
String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
JsonPathFilter filter = new JsonPathFilter(exp);
- String res = filter.filter(loadReport());
+ String res = filter.filter(new DataFromTopic("", loadReport())).value;
assertThat(res).isEqualTo("\"attTCHSeizures\"");
}
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener;
class PmReportFilterTest {
+ private String filterReport(PmReportFilter filter) throws Exception {
+ return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value;
+ }
+
@Test
void testPmFilterMeasTypes() throws Exception {
- String reportJson = loadReport();
-
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measTypes.add("succImmediateAssignProcs");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(reportJson);
+ String filtered = filterReport(filter);
assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1")
.contains("Gbg-997");
filterData = new PmReportFilter.FilterData();
filterData.measTypes.add("junk");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(reportJson);
+ filtered = filterReport(filter);
assertThat(filtered).isEmpty();
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measObjInstIds.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.measObjInstIds.add("UtranCell=Gbg-997");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
+ filtered = filterReport(filter);
assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998");
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measObjClass.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.measObjClass.add("ENodeBFunction");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
+ filtered = filterReport(filter);
assertThat(filtered).contains("ENodeBFunction").doesNotContain("UtranCell");
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.sourceNames.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.sourceNames.add("O-DU-1122");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
+ filtered = filterReport(filter);
assertThat(filtered).contains("O-DU-1122");
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measuredEntityDns.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
+ filtered = filterReport(filter);
assertThat(filtered).contains("ManagedElement=RNC-Gbg-1");
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter("junk");
+ String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value;
assertThat(filtered).isEmpty();
- filtered = filter.filter(reQuote("{'msg': 'test'}"));
+ filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value;
assertThat(filtered).isEmpty();
}