From 3582cb92934efcfbebaa7b4cd2df49d629d845cb Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Tue, 19 Apr 2022 13:17:23 +0200 Subject: [PATCH] PM Filter Added support for JsonPath and JSLT filtering of Json data. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-743 Change-Id: Ia54fd0e67d48626522d0ba5534dd27fbbde2e3e6 --- config/application_configuration.json | 8 +- pom.xml | 10 ++ .../org/oran/dmaapadapter/repository/InfoType.java | 11 +- .../java/org/oran/dmaapadapter/repository/Job.java | 90 +++++++++++--- .../repository/filters/JsltFilter.java | 68 ++++++++++ .../repository/filters/JsonPathFilter.java | 52 ++++++++ .../repository/filters/RegexpFilter.java | 15 +-- src/main/resources/typeSchemaDmaap.json | 14 ++- src/main/resources/typeSchemaKafka.json | 8 ++ src/main/resources/typeSchemaPmDataDmaap.json | 84 ++++++++----- src/main/resources/typeSchemaPmDataKafka.json | 84 ++++++++----- .../org/oran/dmaapadapter/ApplicationTest.java | 138 ++++++++++++++------- .../org/oran/dmaapadapter/IntegrationWithIcs.java | 6 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 4 +- .../repository/filters/JsltFilterTest.java | 91 ++++++++++++++ .../repository/filters/JsonPathFilterTest.java | 46 +++++++ .../resources/test_application_configuration.json | 5 +- 17 files changed, 586 insertions(+), 148 deletions(-) create mode 100644 src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java create mode 100644 src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java create mode 100644 src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java create mode 100644 src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java diff --git a/config/application_configuration.json b/config/application_configuration.json index 3233804..881da34 100644 --- a/config/application_configuration.json +++ b/config/application_configuration.json @@ -9,6 +9,12 @@ "id": "ExampleInformationType2", "kafkaInputTopic": "TutorialTopic", "useHttpProxy": false - } + }, + { + "id": "PmData", + "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12", + "useHttpProxy": true, + "dataType" : "pmData" + }, ] } diff --git a/pom.xml b/pom.xml index 1188b68..04b5716 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,16 @@ 1.12.1 test + + com.schibsted.spt.data + jslt + 0.1.11 + + + com.jayway.jsonpath + json-path + 2.7.0 + org.springframework.boot spring-boot-starter-test diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index 3a78d6c..7100940 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -57,13 +57,18 @@ public class InfoType { } public enum DataType { - PM_DATA, TEXT + PM_DATA, OTHER } public DataType getDataType() { - if (dataType != null && dataType.equalsIgnoreCase("pmData")) { + if (dataType == null) { + return DataType.OTHER; + } + + if (dataType.equalsIgnoreCase("pmData")) { return DataType.PM_DATA; } - return DataType.TEXT; + return DataType.OTHER; + } } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 5a2e373..9dd4987 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -20,45 +20,86 @@ package org.oran.dmaapadapter.repository; -import com.google.common.base.Strings; +import com.google.gson.GsonBuilder; +import java.lang.invoke.MethodHandles; import java.time.Duration; import lombok.Getter; +import lombok.Setter; import org.immutables.gson.Gson; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.repository.filters.Filter; +import org.oran.dmaapadapter.repository.filters.JsltFilter; +import org.oran.dmaapadapter.repository.filters.JsonPathFilter; import org.oran.dmaapadapter.repository.filters.PmReportFilter; import org.oran.dmaapadapter.repository.filters.RegexpFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Job { + private static com.google.gson.Gson gson = new GsonBuilder().create(); + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @Gson.TypeAdapters public static class Parameters { - @Getter - private String filter; + public static final String REGEXP_TYPE = "regexp"; + public static final String PM_FILTER_TYPE = "pmdata"; + public static final String JSLT_FILTER_TYPE = "jslt"; + public static final String JSON_PATH_FILTER_TYPE = "json-path"; + + @Setter + private String filterType = REGEXP_TYPE; + private Object filter; @Getter private BufferTimeout bufferTimeout; private Integer maxConcurrency; - @Getter - private PmReportFilter.FilterData pmFilter; - public Parameters() {} - public Parameters(String filter, BufferTimeout bufferTimeout, Integer maxConcurrency, - PmReportFilter.FilterData pmFilter) { + public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency) { this.filter = filter; this.bufferTimeout = bufferTimeout; this.maxConcurrency = maxConcurrency; - this.pmFilter = pmFilter; + this.filterType = filterType; } public int getMaxConcurrency() { return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency; } + + public String getFilterAsString() { + return this.filter.toString(); + } + + public PmReportFilter.FilterData getPmFilter() { + String str = gson.toJson(this.filter); + return gson.fromJson(str, PmReportFilter.FilterData.class); + } + + public enum FilterType { + REGEXP, JSLT, JSON_PATH, PM_DATA, NONE + } + + public FilterType getFilterType() { + if (filter == null || filterType == null) { + return FilterType.NONE; + } else if (filterType.equalsIgnoreCase(JSLT_FILTER_TYPE)) { + return FilterType.JSLT; + } else if (filterType.equalsIgnoreCase(JSON_PATH_FILTER_TYPE)) { + return FilterType.JSON_PATH; + } else if (filterType.equalsIgnoreCase(REGEXP_TYPE)) { + return FilterType.REGEXP; + } else if (filterType.equalsIgnoreCase(PM_FILTER_TYPE)) { + return FilterType.PM_DATA; + } else { + logger.warn("Unsupported filter type: {}", this.filterType); + return FilterType.NONE; + } + } } @Gson.TypeAdapters @@ -111,17 +152,34 @@ public class Job { this.owner = owner; this.lastUpdated = lastUpdated; this.parameters = parameters; - if (parameters != null && !Strings.isNullOrEmpty(parameters.filter)) { - filter = new RegexpFilter(parameters.filter); - } else if (parameters != null && parameters.pmFilter != null) { - filter = new PmReportFilter(parameters.pmFilter); - } else { - filter = null; - } + filter = createFilter(parameters); this.consumerRestClient = consumerRestClient; } + private static Filter createFilter(Parameters parameters) { + + if (parameters.filter == null) { + return null; + } + + switch (parameters.getFilterType()) { + case PM_DATA: + return new PmReportFilter(parameters.getPmFilter()); + case REGEXP: + return new RegexpFilter(parameters.getFilterAsString()); + case JSLT: + return new JsltFilter(parameters.getFilterAsString()); + case JSON_PATH: + return new JsonPathFilter(parameters.getFilterAsString()); + case NONE: + return null; + default: + logger.error("Not handeled filter type: {}", parameters.getFilterType()); + return null; + } + } + public String filter(String data) { if (filter == null) { return data; diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java new file mode 100644 index 0000000..5c2520e --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java @@ -0,0 +1,68 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; +import com.schibsted.spt.data.jslt.Expression; +import com.schibsted.spt.data.jslt.Parser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsltFilter implements Filter { + + private Expression expression; + private final ObjectMapper mapper = new ObjectMapper(); + private static final Logger logger = LoggerFactory.getLogger(JsltFilter.class); + + public JsltFilter(String exp) { + try { + expression = Parser.compileString(exp); + } catch (Exception e) { + logger.warn("Could not parse JSLT expression: {}, reason: {}", exp, e.getMessage()); + } + } + + @Override + public String filter(String jsonString) { + if (expression == null) { + return jsonString; + } + try { + JsonFactory factory = mapper.getFactory(); + JsonParser parser = factory.createParser(jsonString); + JsonNode actualObj = mapper.readTree(parser); + + JsonNode filteredNode = expression.apply(actualObj); + if (filteredNode == NullNode.instance) { + return ""; + } + return mapper.writeValueAsString(filteredNode); + } catch (Exception e) { + return ""; + } + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java new file mode 100644 index 0000000..a958649 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java @@ -0,0 +1,52 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import com.jayway.jsonpath.JsonPath; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonPathFilter implements Filter { + + private String expression; + private static final Logger logger = LoggerFactory.getLogger(JsonPathFilter.class); + com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + + public JsonPathFilter(String exp) { + try { + expression = exp; + } catch (Exception e) { + logger.warn("Could not parse Json Path expression: {}, reason: {}", exp, e.getMessage()); + } + } + + @Override + public String filter(String jsonString) { + try { + Object o = JsonPath.parse(jsonString).read(this.expression, Object.class); + return o == null ? "" : gson.toJson(o); + } catch (Exception e) { + return ""; + } + + } +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java index aac808c..c8b5541 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java @@ -20,25 +20,26 @@ package org.oran.dmaapadapter.repository.filters; - import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RegexpFilter implements Filter { - - private final Pattern regexp; + private static final Logger logger = LoggerFactory.getLogger(RegexpFilter.class); + private Pattern regexp; public RegexpFilter(String exp) { - if (exp != null) { + try { regexp = Pattern.compile(exp); - } else { - regexp = null; + } catch (Exception e) { + logger.warn("Could not parse REGEXP expression: {}, reason: {}", exp, e.getMessage()); } } + @Override public String filter(String data) { - if (regexp == null) { return data; } diff --git a/src/main/resources/typeSchemaDmaap.json b/src/main/resources/typeSchemaDmaap.json index a50b236..146b9eb 100644 --- a/src/main/resources/typeSchemaDmaap.json +++ b/src/main/resources/typeSchemaDmaap.json @@ -3,8 +3,16 @@ "type": "object", "properties": { "filter": { - "type": "string" - } + "type": "string" + }, + "filterType": { + "type": "string", + "enum": [ + "jslt", + "regexp", + "json-path" + ] + } }, "additionalProperties": false -} +} \ No newline at end of file diff --git a/src/main/resources/typeSchemaKafka.json b/src/main/resources/typeSchemaKafka.json index f7e6e87..99b8647 100644 --- a/src/main/resources/typeSchemaKafka.json +++ b/src/main/resources/typeSchemaKafka.json @@ -5,6 +5,14 @@ "filter": { "type": "string" }, + "filterType": { + "type": "string", + "enum": [ + "jslt", + "regexp", + "json-path" + ] + }, "maxConcurrency": { "type": "integer", "minimum": 1 diff --git a/src/main/resources/typeSchemaPmDataDmaap.json b/src/main/resources/typeSchemaPmDataDmaap.json index 8dd73e9..616cc02 100644 --- a/src/main/resources/typeSchemaPmDataDmaap.json +++ b/src/main/resources/typeSchemaPmDataDmaap.json @@ -3,43 +3,59 @@ "type": "object", "additionalProperties": false, "properties": { - "pmFilter": { - "type": "object", - "additionalProperties": false, - "properties": { - "sourceNames": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measObjInstIds": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measTypes": { - "type": "array", - "items": [ - { - "type": "string" - } - ] + "filter": { + "anyOf": [ + { + "type": "string" }, - "measuredEntityDns": { - "type": "array", - "items": [ - { - "type": "string" + { + "type": "object", + "additionalProperties": false, + "properties": { + "sourceNames": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measObjInstIds": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measuredEntityDns": { + "type": "array", + "items": [ + { + "type": "string" + } + ] } - ] + } } - } + ] + }, + "filterType": { + "type": "string", + "enum": [ + "jslt", + "regexp", + "pmdata", + "json-path" + ] } } } \ No newline at end of file diff --git a/src/main/resources/typeSchemaPmDataKafka.json b/src/main/resources/typeSchemaPmDataKafka.json index 6690740..84d21f8 100644 --- a/src/main/resources/typeSchemaPmDataKafka.json +++ b/src/main/resources/typeSchemaPmDataKafka.json @@ -3,43 +3,59 @@ "type": "object", "additionalProperties": false, "properties": { - "pmFilter": { - "type": "object", - "additionalProperties": false, - "properties": { - "sourceNames": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measObjInstIds": { - "type": "array", - "items": [ - { - "type": "string" - } - ] + "filter": { + "anyOf": [ + { + "type": "string" }, - "measTypes": { - "type": "array", - "items": [ - { - "type": "string" + { + "type": "object", + "additionalProperties": false, + "properties": { + "sourceNames": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measObjInstIds": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measuredEntityDns": { + "type": "array", + "items": [ + { + "type": "string" + } + ] } - ] - }, - "measuredEntityDns": { - "type": "array", - "items": [ - { - "type": "string" - } - ] + } } - } + ] + }, + "filterType": { + "type": "string", + "enum": [ + "jslt", + "regexp", + "pmdata", + "json-path" + ] }, "maxConcurrency": { "type": "integer", diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 2066645..fc023cf 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -193,19 +193,25 @@ class ApplicationTest { return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); } - private ConsumerJobInfo consumerJobInfo() { - return consumerJobInfo("DmaapInformationType", "EI_JOB_ID"); + private Object jsonObjectRegexp() { + return jsonObjectFilter(".*", Job.Parameters.REGEXP_TYPE); + + } + + private Object jsonObjectJsonPath() { + return jsonObjectFilter("$", Job.Parameters.JSON_PATH_FILTER_TYPE); } - private ConsumerJobInfo consumerPmJobInfo() { - return consumerJobInfo("PmInformationType", "EI_PM_JOB_ID"); + private String quote(String str) { + return "\"" + str + "\""; } - private Object jsonObject() { - return jsonObject("{}"); + private Object jsonObjectFilter(String filter, String filterType) { + return toJson("{" + quote("filter") + ":" + quote(filter) + "," + quote("filterType") + ":" + quote(filterType) + + "}"); } - private Object jsonObject(String json) { + private Object toJson(String json) { try { return JsonParser.parseString(json).getAsJsonObject(); } catch (Exception e) { @@ -213,15 +219,25 @@ class ApplicationTest { } } - private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, Object filter) { try { String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + return new ConsumerJobInfo(typeId, filter, "owner", targetUri, ""); } catch (Exception e) { return null; } } + private void waitForRegistration() { + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + producerRegistrationTask.supervisionTask().block(); + + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue(); + assertThat(icsSimulatorController.testResults.types).hasSize(this.types.size()); + } + @Test void generateApiDoc() throws IOException { String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs"; @@ -258,14 +274,12 @@ class ApplicationTest { void testReceiveAndPostDataFromKafka() throws Exception { final String JOB_ID = "ID"; final String TYPE_ID = "KafkaInformationType"; - await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1, null); + Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1); String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - ConsumerJobInfo kafkaJobInfo = - new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); + ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, ""); this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -287,10 +301,6 @@ class ApplicationTest { kafkaConsumer.stop(); this.kafkaTopicConsumers.restartNonRunningTopics(); await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue()); - - // Delete the job - this.icsSimulatorController.deleteJob(JOB_ID, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } @Test @@ -298,13 +308,11 @@ class ApplicationTest { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue(); - producerRegistrationTask.supervisionTask().block(); + waitForRegistration(); // Create a job - this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()), JOB_ID, + restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); // Return two messages from DMAAP and verify that these are sent to the owner of @@ -330,19 +338,18 @@ class ApplicationTest { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); - producerRegistrationTask.supervisionTask().block(); + waitForRegistration(); // Create a job with a PM filter - ConsumerJobInfo jobInfo = consumerPmJobInfo(); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("succImmediateAssignProcs"); filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); - Job.Parameters param = new Job.Parameters(null, null, null, filterData); + Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null); String paramJson = gson.toJson(param); - jobInfo.jobDefinition = jsonObject(paramJson); + ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson)); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -357,10 +364,61 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); String receivedFiltered = consumer.receivedBodies.get(0); assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1"); + } - // Delete the job - this.icsSimulatorController.deleteJob(JOB_ID, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + @Test + void testJsltFiltering() throws Exception { + + final String JOB_ID = "ID"; + + // Register producer, Register types + waitForRegistration(); + + // Create a job with a PM filter + String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" // + + "."; + Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null); + String paramJson = gson.toJson(param); + ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson)); + + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Return one messagefrom DMAAP and verify that the job (consumer) receives a + // filtered PM message + String path = "./src/test/resources/pm_report.json"; + String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); + DmaapSimulatorController.dmaapPmResponses.add(pmReportJson); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); + String receivedFiltered = consumer.receivedBodies.get(0); + assertThat(receivedFiltered).contains("event"); + } + + @Test + void testJsonPathFiltering() throws Exception { + final String JOB_ID = "ID"; + + // Register producer, Register types + waitForRegistration(); + + // Create a job with a JsonPath + ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath()); + + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Return one messagefrom DMAAP and verify that the job (consumer) receives a + // filtered PM message + String path = "./src/test/resources/pm_report.json"; + String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); + DmaapSimulatorController.dmaapPmResponses.add(pmReportJson); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); + String receivedFiltered = consumer.receivedBodies.get(0); + assertThat(receivedFiltered).contains("event"); } @Test @@ -371,33 +429,27 @@ class ApplicationTest { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); - producerRegistrationTask.supervisionTask().block(); + waitForRegistration(); // Create a job with a PM filter - ConsumerJobInfo jobInfo = consumerPmJobInfo(); - jobInfo.infoTypeId = "PmInformationTypeKafka"; PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getMeasTypes().add("succImmediateAssignProcs"); - Job.Parameters param = new Job.Parameters(null, null, null, filterData); + Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null); String paramJson = gson.toJson(param); - jobInfo.jobDefinition = jsonObject(paramJson); + + ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson)); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - // Delete the job - this.icsSimulatorController.deleteJob(JOB_ID, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } @Test void testReRegister() throws Exception { // Wait foir register types and producer - await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + waitForRegistration(); // Clear the registration, should trigger a re-register icsSimulatorController.testResults.reset(); + producerRegistrationTask.supervisionTask().block(); await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 62d4158..0abaf59 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -214,7 +214,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1, null); + Job.Parameters param = + new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); @@ -233,7 +234,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1, null); + Job.Parameters param = + new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 170 * 1000), 1); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 5f19633..3d24759 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -185,8 +185,8 @@ class IntegrationWithKafka { private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { - Job.Parameters param = - new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency, null); + Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, + new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency); String str = gson.toJson(param); return jsonObject(str); } diff --git a/src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java new file mode 100644 index 0000000..c769c56 --- /dev/null +++ b/src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java @@ -0,0 +1,91 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.jupiter.api.Test; + +class JsltFilterTest { + + @Test + void testPickOneValue() throws Exception { + String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" // + + ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue"; + + JsltFilter filter = new JsltFilter(reQuote(expresssion)); + String res = filter.filter(loadReport()); + assertThat(res).isEqualTo(reQuote("'813'")); + } + + @Test + void testPickWholeReport() throws Exception { + String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" // + + "."; + + JsltFilter filter = new JsltFilter(reQuote(expresssion)); + String res = filter.filter(loadReport()); + assertThat(res).contains("event"); + } + + @Test + void testNoMatch() throws Exception { + String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" // + + "."; + JsltFilter filter = new JsltFilter(reQuote(expresssion)); + String res = filter.filter(loadReport()); + assertThat(res).isEmpty(); + } + + @Test + void testMoreAdvanced() throws Exception { + + String expresssion = // + "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" + // + "{ " + // + "'array' : [for (.event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList) string(.measObjInstId)], " + + // + "'size' : size(.event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList)" + // + "}"; // + + JsltFilter filter = new JsltFilter(reQuote(expresssion)); + String res = filter.filter(loadReport()); + String expected = + "{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}"; + + assertThat(res).isEqualTo(reQuote(expected)); + + } + + private String loadReport() throws Exception { + String path = "./src/test/resources/pm_report.json"; + return Files.readString(Path.of(path), Charset.defaultCharset()); + } + + private String reQuote(String str) { + return str.replaceAll("'", "\\\""); + } + +} diff --git a/src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java new file mode 100644 index 0000000..1360943 --- /dev/null +++ b/src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java @@ -0,0 +1,46 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.jupiter.api.Test; + +class JsonPathFilterTest { + + @Test + void testJsonPath() throws Exception { + String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); + JsonPathFilter filter = new JsonPathFilter(exp); + String res = filter.filter(loadReport()); + assertThat(res).isEqualTo("\"attTCHSeizures\""); + } + + private String loadReport() throws Exception { + String path = "./src/test/resources/pm_report.json"; + return Files.readString(Path.of(path), Charset.defaultCharset()); + } + +} diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index 8a89c1a..581e164 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -3,13 +3,12 @@ { "id": "DmaapInformationType", "dmaapTopicUrl": "/dmaap-topic-1", - "useHttpProxy": false, - "dataType": "text" + "useHttpProxy": false }, { "id": "KafkaInformationType", "kafkaInputTopic": "TutorialTopic", - "useHttpProxy": false + "useHttpProxy": false }, { "id": "PmInformationType", -- 2.16.6