Added support for JsonPath and JSLT filtering of Json data.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: Ia54fd0e67d48626522d0ba5534dd27fbbde2e3e6
"id": "ExampleInformationType2",
"kafkaInputTopic": "TutorialTopic",
"useHttpProxy": false
- }
+ },
+ {
+ "id": "PmData",
+ "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
+ "useHttpProxy": true,
+ "dataType" : "pmData"
+ },
]
}
<version>1.12.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.schibsted.spt.data</groupId>
+ <artifactId>jslt</artifactId>
+ <version>0.1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.7.0</version>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
}
public enum DataType {
- PM_DATA, TEXT
+ PM_DATA, OTHER
}
public DataType getDataType() {
- if (dataType != null && dataType.equalsIgnoreCase("pmData")) {
+ if (dataType == null) {
+ return DataType.OTHER;
+ }
+
+ if (dataType.equalsIgnoreCase("pmData")) {
return DataType.PM_DATA;
}
- return DataType.TEXT;
+ return DataType.OTHER;
+
}
}
package org.oran.dmaapadapter.repository;
-import com.google.common.base.Strings;
+import com.google.gson.GsonBuilder;
+import java.lang.invoke.MethodHandles;
import java.time.Duration;
import lombok.Getter;
+import lombok.Setter;
import org.immutables.gson.Gson;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.repository.filters.Filter;
+import org.oran.dmaapadapter.repository.filters.JsltFilter;
+import org.oran.dmaapadapter.repository.filters.JsonPathFilter;
import org.oran.dmaapadapter.repository.filters.PmReportFilter;
import org.oran.dmaapadapter.repository.filters.RegexpFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Job {
+ private static com.google.gson.Gson gson = new GsonBuilder().create();
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
@Gson.TypeAdapters
public static class Parameters {
- @Getter
- private String filter;
+ public static final String REGEXP_TYPE = "regexp";
+ public static final String PM_FILTER_TYPE = "pmdata";
+ public static final String JSLT_FILTER_TYPE = "jslt";
+ public static final String JSON_PATH_FILTER_TYPE = "json-path";
+
+ @Setter
+ private String filterType = REGEXP_TYPE;
+ private Object filter;
@Getter
private BufferTimeout bufferTimeout;
private Integer maxConcurrency;
- @Getter
- private PmReportFilter.FilterData pmFilter;
-
public Parameters() {}
- public Parameters(String filter, BufferTimeout bufferTimeout, Integer maxConcurrency,
- PmReportFilter.FilterData pmFilter) {
+ public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency) {
this.filter = filter;
this.bufferTimeout = bufferTimeout;
this.maxConcurrency = maxConcurrency;
- this.pmFilter = pmFilter;
+ this.filterType = filterType;
}
public int getMaxConcurrency() {
return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
}
+
+ public String getFilterAsString() {
+ return this.filter.toString();
+ }
+
+ public PmReportFilter.FilterData getPmFilter() {
+ String str = gson.toJson(this.filter);
+ return gson.fromJson(str, PmReportFilter.FilterData.class);
+ }
+
+ public enum FilterType {
+ REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
+ }
+
+ public FilterType getFilterType() {
+ if (filter == null || filterType == null) {
+ return FilterType.NONE;
+ } else if (filterType.equalsIgnoreCase(JSLT_FILTER_TYPE)) {
+ return FilterType.JSLT;
+ } else if (filterType.equalsIgnoreCase(JSON_PATH_FILTER_TYPE)) {
+ return FilterType.JSON_PATH;
+ } else if (filterType.equalsIgnoreCase(REGEXP_TYPE)) {
+ return FilterType.REGEXP;
+ } else if (filterType.equalsIgnoreCase(PM_FILTER_TYPE)) {
+ return FilterType.PM_DATA;
+ } else {
+ logger.warn("Unsupported filter type: {}", this.filterType);
+ return FilterType.NONE;
+ }
+ }
}
@Gson.TypeAdapters
this.owner = owner;
this.lastUpdated = lastUpdated;
this.parameters = parameters;
- if (parameters != null && !Strings.isNullOrEmpty(parameters.filter)) {
- filter = new RegexpFilter(parameters.filter);
- } else if (parameters != null && parameters.pmFilter != null) {
- filter = new PmReportFilter(parameters.pmFilter);
- } else {
- filter = null;
- }
+ filter = createFilter(parameters);
this.consumerRestClient = consumerRestClient;
}
+ private static Filter createFilter(Parameters parameters) {
+
+ if (parameters.filter == null) {
+ return null;
+ }
+
+ switch (parameters.getFilterType()) {
+ case PM_DATA:
+ return new PmReportFilter(parameters.getPmFilter());
+ case REGEXP:
+ return new RegexpFilter(parameters.getFilterAsString());
+ case JSLT:
+ return new JsltFilter(parameters.getFilterAsString());
+ case JSON_PATH:
+ return new JsonPathFilter(parameters.getFilterAsString());
+ case NONE:
+ return null;
+ default:
+ logger.error("Not handeled filter type: {}", parameters.getFilterType());
+ return null;
+ }
+ }
+
public String filter(String data) {
if (filter == null) {
return data;
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.Parser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JsltFilter implements Filter {
+
+ private Expression expression;
+ private final ObjectMapper mapper = new ObjectMapper();
+ private static final Logger logger = LoggerFactory.getLogger(JsltFilter.class);
+
+ public JsltFilter(String exp) {
+ try {
+ expression = Parser.compileString(exp);
+ } catch (Exception e) {
+ logger.warn("Could not parse JSLT expression: {}, reason: {}", exp, e.getMessage());
+ }
+ }
+
+ @Override
+ public String filter(String jsonString) {
+ if (expression == null) {
+ return jsonString;
+ }
+ try {
+ JsonFactory factory = mapper.getFactory();
+ JsonParser parser = factory.createParser(jsonString);
+ JsonNode actualObj = mapper.readTree(parser);
+
+ JsonNode filteredNode = expression.apply(actualObj);
+ if (filteredNode == NullNode.instance) {
+ return "";
+ }
+ return mapper.writeValueAsString(filteredNode);
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import com.jayway.jsonpath.JsonPath;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JsonPathFilter implements Filter {
+
+ private String expression;
+ private static final Logger logger = LoggerFactory.getLogger(JsonPathFilter.class);
+ com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
+ public JsonPathFilter(String exp) {
+ try {
+ expression = exp;
+ } catch (Exception e) {
+ logger.warn("Could not parse Json Path expression: {}, reason: {}", exp, e.getMessage());
+ }
+ }
+
+ @Override
+ public String filter(String jsonString) {
+ try {
+ Object o = JsonPath.parse(jsonString).read(this.expression, Object.class);
+ return o == null ? "" : gson.toJson(o);
+ } catch (Exception e) {
+ return "";
+ }
+
+ }
+}
package org.oran.dmaapadapter.repository.filters;
-
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RegexpFilter implements Filter {
-
- private final Pattern regexp;
+ private static final Logger logger = LoggerFactory.getLogger(RegexpFilter.class);
+ private Pattern regexp;
public RegexpFilter(String exp) {
- if (exp != null) {
+ try {
regexp = Pattern.compile(exp);
- } else {
- regexp = null;
+ } catch (Exception e) {
+ logger.warn("Could not parse REGEXP expression: {}, reason: {}", exp, e.getMessage());
}
}
+ @Override
public String filter(String data) {
-
if (regexp == null) {
return data;
}
"type": "object",
"properties": {
"filter": {
- "type": "string"
- }
+ "type": "string"
+ },
+ "filterType": {
+ "type": "string",
+ "enum": [
+ "jslt",
+ "regexp",
+ "json-path"
+ ]
+ }
},
"additionalProperties": false
-}
+}
\ No newline at end of file
"filter": {
"type": "string"
},
+ "filterType": {
+ "type": "string",
+ "enum": [
+ "jslt",
+ "regexp",
+ "json-path"
+ ]
+ },
"maxConcurrency": {
"type": "integer",
"minimum": 1
"type": "object",
"additionalProperties": false,
"properties": {
- "pmFilter": {
- "type": "object",
- "additionalProperties": false,
- "properties": {
- "sourceNames": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measObjInstIds": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measTypes": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
+ "filter": {
+ "anyOf": [
+ {
+ "type": "string"
},
- "measuredEntityDns": {
- "type": "array",
- "items": [
- {
- "type": "string"
+ {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "sourceNames": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measObjInstIds": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measuredEntityDns": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
}
- ]
+ }
}
- }
+ ]
+ },
+ "filterType": {
+ "type": "string",
+ "enum": [
+ "jslt",
+ "regexp",
+ "pmdata",
+ "json-path"
+ ]
}
}
}
\ No newline at end of file
"type": "object",
"additionalProperties": false,
"properties": {
- "pmFilter": {
- "type": "object",
- "additionalProperties": false,
- "properties": {
- "sourceNames": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measObjInstIds": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
+ "filter": {
+ "anyOf": [
+ {
+ "type": "string"
},
- "measTypes": {
- "type": "array",
- "items": [
- {
- "type": "string"
+ {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "sourceNames": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measObjInstIds": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measuredEntityDns": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
}
- ]
- },
- "measuredEntityDns": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
+ }
}
- }
+ ]
+ },
+ "filterType": {
+ "type": "string",
+ "enum": [
+ "jslt",
+ "regexp",
+ "pmdata",
+ "json-path"
+ ]
},
"maxConcurrency": {
"type": "integer",
return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
}
- private ConsumerJobInfo consumerJobInfo() {
- return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
+ private Object jsonObjectRegexp() {
+ return jsonObjectFilter(".*", Job.Parameters.REGEXP_TYPE);
+
+ }
+
+ private Object jsonObjectJsonPath() {
+ return jsonObjectFilter("$", Job.Parameters.JSON_PATH_FILTER_TYPE);
}
- private ConsumerJobInfo consumerPmJobInfo() {
- return consumerJobInfo("PmInformationType", "EI_PM_JOB_ID");
+ private String quote(String str) {
+ return "\"" + str + "\"";
}
- private Object jsonObject() {
- return jsonObject("{}");
+ private Object jsonObjectFilter(String filter, String filterType) {
+ return toJson("{" + quote("filter") + ":" + quote(filter) + "," + quote("filterType") + ":" + quote(filterType)
+ + "}");
}
- private Object jsonObject(String json) {
+ private Object toJson(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
} catch (Exception e) {
}
}
- private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) {
+ private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, Object filter) {
try {
String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, "");
+ return new ConsumerJobInfo(typeId, filter, "owner", targetUri, "");
} catch (Exception e) {
return null;
}
}
+ private void waitForRegistration() {
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ producerRegistrationTask.supervisionTask().block();
+
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
+ assertThat(icsSimulatorController.testResults.types).hasSize(this.types.size());
+ }
+
@Test
void generateApiDoc() throws IOException {
String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
void testReceiveAndPostDataFromKafka() throws Exception {
final String JOB_ID = "ID";
final String TYPE_ID = "KafkaInformationType";
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ waitForRegistration();
// Create a job
- Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1, null);
+ Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1);
String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- ConsumerJobInfo kafkaJobInfo =
- new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+ ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, "");
this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
kafkaConsumer.stop();
this.kafkaTopicConsumers.restartNonRunningTopics();
await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
-
- // Delete the job
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
- producerRegistrationTask.supervisionTask().block();
+ waitForRegistration();
// Create a job
- this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()), JOB_ID,
+ restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// Return two messages from DMAAP and verify that these are sent to the owner of
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- producerRegistrationTask.supervisionTask().block();
+ waitForRegistration();
// Create a job with a PM filter
- ConsumerJobInfo jobInfo = consumerPmJobInfo();
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+
filterData.getMeasTypes().add("succImmediateAssignProcs");
filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
- Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
String paramJson = gson.toJson(param);
- jobInfo.jobDefinition = jsonObject(paramJson);
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
String receivedFiltered = consumer.receivedBodies.get(0);
assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
+ }
- // Delete the job
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ @Test
+ void testJsltFiltering() throws Exception {
+
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ waitForRegistration();
+
+ // Create a job with a PM filter
+ String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" //
+ + ".";
+ Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null);
+ String paramJson = gson.toJson(param);
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson));
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Return one messagefrom DMAAP and verify that the job (consumer) receives a
+ // filtered PM message
+ String path = "./src/test/resources/pm_report.json";
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+ DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ String receivedFiltered = consumer.receivedBodies.get(0);
+ assertThat(receivedFiltered).contains("event");
+ }
+
+ @Test
+ void testJsonPathFiltering() throws Exception {
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ waitForRegistration();
+
+ // Create a job with a JsonPath
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Return one messagefrom DMAAP and verify that the job (consumer) receives a
+ // filtered PM message
+ String path = "./src/test/resources/pm_report.json";
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+ DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ String receivedFiltered = consumer.receivedBodies.get(0);
+ assertThat(receivedFiltered).contains("event");
}
@Test
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- producerRegistrationTask.supervisionTask().block();
+ waitForRegistration();
// Create a job with a PM filter
- ConsumerJobInfo jobInfo = consumerPmJobInfo();
- jobInfo.infoTypeId = "PmInformationTypeKafka";
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.getMeasTypes().add("succImmediateAssignProcs");
- Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
String paramJson = gson.toJson(param);
- jobInfo.jobDefinition = jsonObject(paramJson);
+
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
- // Delete the job
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ waitForRegistration();
// Clear the registration, should trigger a re-register
icsSimulatorController.testResults.reset();
+ producerRegistrationTask.supervisionTask().block();
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1, null);
+ Job.Parameters param =
+ new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1);
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1, null);
+ Job.Parameters param =
+ new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 170 * 1000), 1);
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
int maxConcurrency) {
- Job.Parameters param =
- new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency, null);
+ Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE,
+ new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
String str = gson.toJson(param);
return jsonObject(str);
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+class JsltFilterTest {
+
+ @Test
+ void testPickOneValue() throws Exception {
+ String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
+ + ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue";
+
+ JsltFilter filter = new JsltFilter(reQuote(expresssion));
+ String res = filter.filter(loadReport());
+ assertThat(res).isEqualTo(reQuote("'813'"));
+ }
+
+ @Test
+ void testPickWholeReport() throws Exception {
+ String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
+ + ".";
+
+ JsltFilter filter = new JsltFilter(reQuote(expresssion));
+ String res = filter.filter(loadReport());
+ assertThat(res).contains("event");
+ }
+
+ @Test
+ void testNoMatch() throws Exception {
+ String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" //
+ + ".";
+ JsltFilter filter = new JsltFilter(reQuote(expresssion));
+ String res = filter.filter(loadReport());
+ assertThat(res).isEmpty();
+ }
+
+ @Test
+ void testMoreAdvanced() throws Exception {
+
+ String expresssion = //
+ "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" + //
+ "{ " + //
+ "'array' : [for (.event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList) string(.measObjInstId)], "
+ + //
+ "'size' : size(.event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList)" + //
+ "}"; //
+
+ JsltFilter filter = new JsltFilter(reQuote(expresssion));
+ String res = filter.filter(loadReport());
+ String expected =
+ "{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}";
+
+ assertThat(res).isEqualTo(reQuote(expected));
+
+ }
+
+ private String loadReport() throws Exception {
+ String path = "./src/test/resources/pm_report.json";
+ return Files.readString(Path.of(path), Charset.defaultCharset());
+ }
+
+ private String reQuote(String str) {
+ return str.replaceAll("'", "\\\"");
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+class JsonPathFilterTest {
+
+ @Test
+ void testJsonPath() throws Exception {
+ String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
+ JsonPathFilter filter = new JsonPathFilter(exp);
+ String res = filter.filter(loadReport());
+ assertThat(res).isEqualTo("\"attTCHSeizures\"");
+ }
+
+ private String loadReport() throws Exception {
+ String path = "./src/test/resources/pm_report.json";
+ return Files.readString(Path.of(path), Charset.defaultCharset());
+ }
+
+}
{
"id": "DmaapInformationType",
"dmaapTopicUrl": "/dmaap-topic-1",
- "useHttpProxy": false,
- "dataType": "text"
+ "useHttpProxy": false
},
{
"id": "KafkaInformationType",
"kafkaInputTopic": "TutorialTopic",
- "useHttpProxy": false
+ "useHttpProxy": false
},
{
"id": "PmInformationType",