PM Filter 86/8086/2
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 19 Apr 2022 11:17:23 +0000 (13:17 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 19 Apr 2022 13:47:39 +0000 (15:47 +0200)
Added support for JsonPath and JSLT filtering of Json data.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: Ia54fd0e67d48626522d0ba5534dd27fbbde2e3e6

17 files changed:
config/application_configuration.json
pom.xml
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java
src/main/resources/typeSchemaDmaap.json
src/main/resources/typeSchemaKafka.json
src/main/resources/typeSchemaPmDataDmaap.json
src/main/resources/typeSchemaPmDataKafka.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java [new file with mode: 0644]
src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java [new file with mode: 0644]
src/test/resources/test_application_configuration.json

index 3233804..881da34 100644 (file)
@@ -9,6 +9,12 @@
          "id": "ExampleInformationType2",
          "kafkaInputTopic": "TutorialTopic",
          "useHttpProxy": false
-      }
+      },
+       {
+         "id": "PmData",
+         "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
+         "useHttpProxy": true,
+         "dataType" : "pmData"
+      },
    ]
 }
diff --git a/pom.xml b/pom.xml
index 1188b68..04b5716 100644 (file)
--- a/pom.xml
+++ b/pom.xml
             <version>1.12.1</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.schibsted.spt.data</groupId>
+            <artifactId>jslt</artifactId>
+            <version>0.1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>2.7.0</version>
+        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
index 3a78d6c..7100940 100644 (file)
@@ -57,13 +57,18 @@ public class InfoType {
     }
 
     public enum DataType {
-        PM_DATA, TEXT
+        PM_DATA, OTHER
     }
 
     public DataType getDataType() {
-        if (dataType != null && dataType.equalsIgnoreCase("pmData")) {
+        if (dataType == null) {
+            return DataType.OTHER;
+        }
+
+        if (dataType.equalsIgnoreCase("pmData")) {
             return DataType.PM_DATA;
         }
-        return DataType.TEXT;
+        return DataType.OTHER;
+
     }
 }
index 5a2e373..9dd4987 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
-import com.google.common.base.Strings;
+import com.google.gson.GsonBuilder;
 
+import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import org.immutables.gson.Gson;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.repository.filters.Filter;
+import org.oran.dmaapadapter.repository.filters.JsltFilter;
+import org.oran.dmaapadapter.repository.filters.JsonPathFilter;
 import org.oran.dmaapadapter.repository.filters.PmReportFilter;
 import org.oran.dmaapadapter.repository.filters.RegexpFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Job {
 
+    private static com.google.gson.Gson gson = new GsonBuilder().create();
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
     @Gson.TypeAdapters
     public static class Parameters {
-        @Getter
-        private String filter;
+        public static final String REGEXP_TYPE = "regexp";
+        public static final String PM_FILTER_TYPE = "pmdata";
+        public static final String JSLT_FILTER_TYPE = "jslt";
+        public static final String JSON_PATH_FILTER_TYPE = "json-path";
+
+        @Setter
+        private String filterType = REGEXP_TYPE;
+        private Object filter;
         @Getter
         private BufferTimeout bufferTimeout;
 
         private Integer maxConcurrency;
 
-        @Getter
-        private PmReportFilter.FilterData pmFilter;
-
         public Parameters() {}
 
-        public Parameters(String filter, BufferTimeout bufferTimeout, Integer maxConcurrency,
-                PmReportFilter.FilterData pmFilter) {
+        public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency) {
             this.filter = filter;
             this.bufferTimeout = bufferTimeout;
             this.maxConcurrency = maxConcurrency;
-            this.pmFilter = pmFilter;
+            this.filterType = filterType;
         }
 
         public int getMaxConcurrency() {
             return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
         }
+
+        public String getFilterAsString() {
+            return this.filter.toString();
+        }
+
+        public PmReportFilter.FilterData getPmFilter() {
+            String str = gson.toJson(this.filter);
+            return gson.fromJson(str, PmReportFilter.FilterData.class);
+        }
+
+        public enum FilterType {
+            REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
+        }
+
+        public FilterType getFilterType() {
+            if (filter == null || filterType == null) {
+                return FilterType.NONE;
+            } else if (filterType.equalsIgnoreCase(JSLT_FILTER_TYPE)) {
+                return FilterType.JSLT;
+            } else if (filterType.equalsIgnoreCase(JSON_PATH_FILTER_TYPE)) {
+                return FilterType.JSON_PATH;
+            } else if (filterType.equalsIgnoreCase(REGEXP_TYPE)) {
+                return FilterType.REGEXP;
+            } else if (filterType.equalsIgnoreCase(PM_FILTER_TYPE)) {
+                return FilterType.PM_DATA;
+            } else {
+                logger.warn("Unsupported filter type: {}", this.filterType);
+                return FilterType.NONE;
+            }
+        }
     }
 
     @Gson.TypeAdapters
@@ -111,17 +152,34 @@ public class Job {
         this.owner = owner;
         this.lastUpdated = lastUpdated;
         this.parameters = parameters;
-        if (parameters != null && !Strings.isNullOrEmpty(parameters.filter)) {
-            filter = new RegexpFilter(parameters.filter);
-        } else if (parameters != null && parameters.pmFilter != null) {
-            filter = new PmReportFilter(parameters.pmFilter);
-        } else {
-            filter = null;
-        }
+        filter = createFilter(parameters);
         this.consumerRestClient = consumerRestClient;
 
     }
 
+    private static Filter createFilter(Parameters parameters) {
+
+        if (parameters.filter == null) {
+            return null;
+        }
+
+        switch (parameters.getFilterType()) {
+            case PM_DATA:
+                return new PmReportFilter(parameters.getPmFilter());
+            case REGEXP:
+                return new RegexpFilter(parameters.getFilterAsString());
+            case JSLT:
+                return new JsltFilter(parameters.getFilterAsString());
+            case JSON_PATH:
+                return new JsonPathFilter(parameters.getFilterAsString());
+            case NONE:
+                return null;
+            default:
+                logger.error("Not handeled filter type: {}", parameters.getFilterType());
+                return null;
+        }
+    }
+
     public String filter(String data) {
         if (filter == null) {
             return data;
diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java
new file mode 100644 (file)
index 0000000..5c2520e
--- /dev/null
@@ -0,0 +1,68 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.Parser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JsltFilter implements Filter {
+
+    private Expression expression;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private static final Logger logger = LoggerFactory.getLogger(JsltFilter.class);
+
+    public JsltFilter(String exp) {
+        try {
+            expression = Parser.compileString(exp);
+        } catch (Exception e) {
+            logger.warn("Could not parse JSLT expression: {}, reason: {}", exp, e.getMessage());
+        }
+    }
+
+    @Override
+    public String filter(String jsonString) {
+        if (expression == null) {
+            return jsonString;
+        }
+        try {
+            JsonFactory factory = mapper.getFactory();
+            JsonParser parser = factory.createParser(jsonString);
+            JsonNode actualObj = mapper.readTree(parser);
+
+            JsonNode filteredNode = expression.apply(actualObj);
+            if (filteredNode == NullNode.instance) {
+                return "";
+            }
+            return mapper.writeValueAsString(filteredNode);
+        } catch (Exception e) {
+            return "";
+        }
+    }
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java
new file mode 100644 (file)
index 0000000..a958649
--- /dev/null
@@ -0,0 +1,52 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import com.jayway.jsonpath.JsonPath;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JsonPathFilter implements Filter {
+
+    private String expression;
+    private static final Logger logger = LoggerFactory.getLogger(JsonPathFilter.class);
+    com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
+    public JsonPathFilter(String exp) {
+        try {
+            expression = exp;
+        } catch (Exception e) {
+            logger.warn("Could not parse Json Path expression: {}, reason: {}", exp, e.getMessage());
+        }
+    }
+
+    @Override
+    public String filter(String jsonString) {
+        try {
+            Object o = JsonPath.parse(jsonString).read(this.expression, Object.class);
+            return o == null ? "" : gson.toJson(o);
+        } catch (Exception e) {
+            return "";
+        }
+
+    }
+}
index aac808c..c8b5541 100644 (file)
 
 package org.oran.dmaapadapter.repository.filters;
 
-
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RegexpFilter implements Filter {
-
-    private final Pattern regexp;
+    private static final Logger logger = LoggerFactory.getLogger(RegexpFilter.class);
+    private Pattern regexp;
 
     public RegexpFilter(String exp) {
-        if (exp != null) {
+        try {
             regexp = Pattern.compile(exp);
-        } else {
-            regexp = null;
+        } catch (Exception e) {
+            logger.warn("Could not parse REGEXP expression: {}, reason: {}", exp, e.getMessage());
         }
     }
 
+    @Override
     public String filter(String data) {
-
         if (regexp == null) {
             return data;
         }
index a50b236..146b9eb 100644 (file)
@@ -3,8 +3,16 @@
   "type": "object",
   "properties": {
     "filter": {
-       "type": "string"
-     }
+      "type": "string"
+    },
+    "filterType": {
+      "type": "string",
+      "enum": [
+        "jslt",
+        "regexp",
+        "json-path"
+      ]
+    }
   },
   "additionalProperties": false
-}
+}
\ No newline at end of file
index f7e6e87..99b8647 100644 (file)
@@ -5,6 +5,14 @@
     "filter": {
       "type": "string"
     },
+    "filterType": {
+      "type": "string",
+      "enum": [
+        "jslt",
+        "regexp",
+        "json-path"
+      ]
+    },
     "maxConcurrency": {
       "type": "integer",
       "minimum": 1
index 8dd73e9..616cc02 100644 (file)
@@ -3,43 +3,59 @@
    "type": "object",
    "additionalProperties": false,
    "properties": {
-      "pmFilter": {
-         "type": "object",
-         "additionalProperties": false,
-         "properties": {
-            "sourceNames": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
-                  }
-               ]
-            },
-            "measObjInstIds": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
-                  }
-               ]
-            },
-            "measTypes": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
-                  }
-               ]
+      "filter": {
+         "anyOf": [
+            {
+               "type": "string"
             },
-            "measuredEntityDns": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
+            {
+               "type": "object",
+               "additionalProperties": false,
+               "properties": {
+                  "sourceNames": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
+                  "measObjInstIds": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
+                  "measTypes": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
+                  "measuredEntityDns": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
                   }
-               ]
+               }
             }
-         }
+         ]
+      },
+      "filterType": {
+         "type": "string",
+         "enum": [
+            "jslt",
+            "regexp",
+            "pmdata",
+            "json-path"
+         ]
       }
    }
 }
\ No newline at end of file
index 6690740..84d21f8 100644 (file)
@@ -3,43 +3,59 @@
    "type": "object",
    "additionalProperties": false,
    "properties": {
-      "pmFilter": {
-         "type": "object",
-         "additionalProperties": false,
-         "properties": {
-            "sourceNames": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
-                  }
-               ]
-            },
-            "measObjInstIds": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
-                  }
-               ]
+      "filter": {
+         "anyOf": [
+            {
+               "type": "string"
             },
-            "measTypes": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
+            {
+               "type": "object",
+               "additionalProperties": false,
+               "properties": {
+                  "sourceNames": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
+                  "measObjInstIds": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
+                  "measTypes": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
+                  "measuredEntityDns": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
                   }
-               ]
-            },
-            "measuredEntityDns": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
-                  }
-               ]
+               }
             }
-         }
+         ]
+      },
+      "filterType": {
+         "type": "string",
+         "enum": [
+            "jslt",
+            "regexp",
+            "pmdata",
+            "json-path"
+         ]
       },
       "maxConcurrency": {
          "type": "integer",
index 2066645..fc023cf 100644 (file)
@@ -193,19 +193,25 @@ class ApplicationTest {
         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
     }
 
-    private ConsumerJobInfo consumerJobInfo() {
-        return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
+    private Object jsonObjectRegexp() {
+        return jsonObjectFilter(".*", Job.Parameters.REGEXP_TYPE);
+
+    }
+
+    private Object jsonObjectJsonPath() {
+        return jsonObjectFilter("$", Job.Parameters.JSON_PATH_FILTER_TYPE);
     }
 
-    private ConsumerJobInfo consumerPmJobInfo() {
-        return consumerJobInfo("PmInformationType", "EI_PM_JOB_ID");
+    private String quote(String str) {
+        return "\"" + str + "\"";
     }
 
-    private Object jsonObject() {
-        return jsonObject("{}");
+    private Object jsonObjectFilter(String filter, String filterType) {
+        return toJson("{" + quote("filter") + ":" + quote(filter) + "," + quote("filterType") + ":" + quote(filterType)
+                + "}");
     }
 
-    private Object jsonObject(String json) {
+    private Object toJson(String json) {
         try {
             return JsonParser.parseString(json).getAsJsonObject();
         } catch (Exception e) {
@@ -213,15 +219,25 @@ class ApplicationTest {
         }
     }
 
-    private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) {
+    private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, Object filter) {
         try {
             String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-            return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, "");
+            return new ConsumerJobInfo(typeId, filter, "owner", targetUri, "");
         } catch (Exception e) {
             return null;
         }
     }
 
+    private void waitForRegistration() {
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        producerRegistrationTask.supervisionTask().block();
+
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
+        assertThat(icsSimulatorController.testResults.types).hasSize(this.types.size());
+    }
+
     @Test
     void generateApiDoc() throws IOException {
         String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
@@ -258,14 +274,12 @@ class ApplicationTest {
     void testReceiveAndPostDataFromKafka() throws Exception {
         final String JOB_ID = "ID";
         final String TYPE_ID = "KafkaInformationType";
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        waitForRegistration();
 
         // Create a job
-        Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1, null);
+        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
         String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-        ConsumerJobInfo kafkaJobInfo =
-                new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+        ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, "");
 
         this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -287,10 +301,6 @@ class ApplicationTest {
         kafkaConsumer.stop();
         this.kafkaTopicConsumers.restartNonRunningTopics();
         await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
-
-        // Delete the job
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     @Test
@@ -298,13 +308,11 @@ class ApplicationTest {
         final String JOB_ID = "ID";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-        assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
-        producerRegistrationTask.supervisionTask().block();
+        waitForRegistration();
 
         // Create a job
-        this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()), JOB_ID,
+                restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
@@ -330,19 +338,18 @@ class ApplicationTest {
         final String JOB_ID = "ID";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        producerRegistrationTask.supervisionTask().block();
+        waitForRegistration();
 
         // Create a job with a PM filter
-        ConsumerJobInfo jobInfo = consumerPmJobInfo();
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+
         filterData.getMeasTypes().add("succImmediateAssignProcs");
         filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
-        Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
         String paramJson = gson.toJson(param);
-        jobInfo.jobDefinition = jsonObject(paramJson);
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -357,10 +364,61 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         String receivedFiltered = consumer.receivedBodies.get(0);
         assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
+    }
 
-        // Delete the job
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    @Test
+    void testJsltFiltering() throws Exception {
+
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        waitForRegistration();
+
+        // Create a job with a PM filter
+        String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" //
+                + ".";
+        Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null);
+        String paramJson = gson.toJson(param);
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson));
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Return one messagefrom DMAAP and verify that the job (consumer) receives a
+        // filtered PM message
+        String path = "./src/test/resources/pm_report.json";
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+        DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+        String receivedFiltered = consumer.receivedBodies.get(0);
+        assertThat(receivedFiltered).contains("event");
+    }
+
+    @Test
+    void testJsonPathFiltering() throws Exception {
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        waitForRegistration();
+
+        // Create a job with a JsonPath
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Return one messagefrom DMAAP and verify that the job (consumer) receives a
+        // filtered PM message
+        String path = "./src/test/resources/pm_report.json";
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+        DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+        String receivedFiltered = consumer.receivedBodies.get(0);
+        assertThat(receivedFiltered).contains("event");
     }
 
     @Test
@@ -371,33 +429,27 @@ class ApplicationTest {
         final String JOB_ID = "ID";
 
         // Register producer, Register types
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        producerRegistrationTask.supervisionTask().block();
+        waitForRegistration();
 
         // Create a job with a PM filter
-        ConsumerJobInfo jobInfo = consumerPmJobInfo();
-        jobInfo.infoTypeId = "PmInformationTypeKafka";
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.getMeasTypes().add("succImmediateAssignProcs");
-        Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
         String paramJson = gson.toJson(param);
-        jobInfo.jobDefinition = jsonObject(paramJson);
+
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson));
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
-        // Delete the job
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     @Test
     void testReRegister() throws Exception {
         // Wait foir register types and producer
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+        waitForRegistration();
 
         // Clear the registration, should trigger a re-register
         icsSimulatorController.testResults.reset();
+        producerRegistrationTask.supervisionTask().block();
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
index 62d4158..0abaf59 100644 (file)
@@ -214,7 +214,8 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1, null);
+        Job.Parameters param =
+                new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1);
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
@@ -233,7 +234,8 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1, null);
+        Job.Parameters param =
+                new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 170 * 1000), 1);
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
index 5f19633..3d24759 100644 (file)
@@ -185,8 +185,8 @@ class IntegrationWithKafka {
 
     private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
             int maxConcurrency) {
-        Job.Parameters param =
-                new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency, null);
+        Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE,
+                new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
         String str = gson.toJson(param);
         return jsonObject(str);
     }
diff --git a/src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java
new file mode 100644 (file)
index 0000000..c769c56
--- /dev/null
@@ -0,0 +1,91 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+class JsltFilterTest {
+
+    @Test
+    void testPickOneValue() throws Exception {
+        String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
+                + ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue";
+
+        JsltFilter filter = new JsltFilter(reQuote(expresssion));
+        String res = filter.filter(loadReport());
+        assertThat(res).isEqualTo(reQuote("'813'"));
+    }
+
+    @Test
+    void testPickWholeReport() throws Exception {
+        String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
+                + ".";
+
+        JsltFilter filter = new JsltFilter(reQuote(expresssion));
+        String res = filter.filter(loadReport());
+        assertThat(res).contains("event");
+    }
+
+    @Test
+    void testNoMatch() throws Exception {
+        String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" //
+                + ".";
+        JsltFilter filter = new JsltFilter(reQuote(expresssion));
+        String res = filter.filter(loadReport());
+        assertThat(res).isEmpty();
+    }
+
+    @Test
+    void testMoreAdvanced() throws Exception {
+
+        String expresssion = //
+                "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" + //
+                        "{ " + //
+                        "'array' : [for (.event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList) string(.measObjInstId)], "
+                        + //
+                        "'size'  : size(.event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList)" + //
+                        "}"; //
+
+        JsltFilter filter = new JsltFilter(reQuote(expresssion));
+        String res = filter.filter(loadReport());
+        String expected =
+                "{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}";
+
+        assertThat(res).isEqualTo(reQuote(expected));
+
+    }
+
+    private String loadReport() throws Exception {
+        String path = "./src/test/resources/pm_report.json";
+        return Files.readString(Path.of(path), Charset.defaultCharset());
+    }
+
+    private String reQuote(String str) {
+        return str.replaceAll("'", "\\\"");
+    }
+
+}
diff --git a/src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java
new file mode 100644 (file)
index 0000000..1360943
--- /dev/null
@@ -0,0 +1,46 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+class JsonPathFilterTest {
+
+    @Test
+    void testJsonPath() throws Exception {
+        String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
+        JsonPathFilter filter = new JsonPathFilter(exp);
+        String res = filter.filter(loadReport());
+        assertThat(res).isEqualTo("\"attTCHSeizures\"");
+    }
+
+    private String loadReport() throws Exception {
+        String path = "./src/test/resources/pm_report.json";
+        return Files.readString(Path.of(path), Charset.defaultCharset());
+    }
+
+}
index 8a89c1a..581e164 100644 (file)
@@ -3,13 +3,12 @@
       {
          "id": "DmaapInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
-         "useHttpProxy": false,
-         "dataType": "text"
+         "useHttpProxy": false
       },
       {
          "id": "KafkaInformationType",
          "kafkaInputTopic": "TutorialTopic",
-         "useHttpProxy": false       
+         "useHttpProxy": false
       },
       {
          "id": "PmInformationType",