}},
"tags": ["Actuator"]
}},
+ "/dmaap-topic-2": {"get": {
+ "summary": "GET from topic",
+ "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.",
+ "operationId": "getFromPmTopic",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ }},
+ "tags": ["DMAAP Simulator (exists only in test)"]
+ }},
"/consumer": {"post": {
"summary": "Consume data",
"requestBody": {
'*/*':
schema:
type: object
+ /dmaap-topic-2:
+ get:
+ tags:
+ - DMAAP Simulator (exists only in test)
+ summary: GET from topic
+ description: The call is invoked to activate or to modify a data subscription.
+ The endpoint is provided by the Information Producer.
+ operationId: getFromPmTopic
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
/consumer:
post:
tags:
## License
-Copyright (C) 2020 Nordix Foundation. All rights reserved.
+Copyright (C) 2022 Nordix Foundation. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
<optional>true</optional>
</dependency>
<!-- TEST -->
+ <!-- https://mvnrepository.com/artifact/com.github.erosb/everit-json-schema -->
+ <dependency>
+ <groupId>com.github.erosb</groupId>
+ <artifactId>everit-json-schema</artifactId>
+ <version>1.12.1</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
+</project>
\ No newline at end of file
public Collection<InfoType> getTypes() {
com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
-
try {
String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
@Getter
private final String kafkaInputTopic;
- public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) {
+ private final String dataType;
+
+ public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType) {
this.id = id;
this.dmaapTopicUrl = dmaapTopicUrl;
this.useHttpProxy = useHttpProxy;
this.kafkaInputTopic = kafkaInputTopic;
+ this.dataType = dataType;
}
public boolean isKafkaTopicDefined() {
return StringUtils.hasLength(dmaapTopicUrl);
}
+ public enum DataType {
+ PM_DATA, TEXT
+ }
+
+ public DataType getDataType() {
+ if (dataType != null && dataType.equalsIgnoreCase("pmData")) {
+ return DataType.PM_DATA;
+ }
+ return DataType.TEXT;
+ }
}
package org.oran.dmaapadapter.repository;
+import com.google.common.base.Strings;
+
import java.time.Duration;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import lombok.Getter;
import org.immutables.gson.Gson;
import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.repository.filters.Filter;
+import org.oran.dmaapadapter.repository.filters.PmReportFilter;
+import org.oran.dmaapadapter.repository.filters.RegexpFilter;
public class Job {
@Getter
private BufferTimeout bufferTimeout;
- private int maxConcurrency;
+ private Integer maxConcurrency;
+
+ @Getter
+ private PmReportFilter.FilterData pmFilter;
public Parameters() {}
- public Parameters(String filter, BufferTimeout bufferTimeout, int maxConcurrency) {
+ public Parameters(String filter, BufferTimeout bufferTimeout, Integer maxConcurrency,
+ PmReportFilter.FilterData pmFilter) {
this.filter = filter;
this.bufferTimeout = bufferTimeout;
this.maxConcurrency = maxConcurrency;
+ this.pmFilter = pmFilter;
}
public int getMaxConcurrency() {
- return maxConcurrency == 0 ? 1 : maxConcurrency;
+ return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
}
}
@Getter
private final String lastUpdated;
- private final Pattern jobDataFilter;
+ private final Filter filter;
@Getter
private final AsyncRestClient consumerRestClient;
this.owner = owner;
this.lastUpdated = lastUpdated;
this.parameters = parameters;
- if (parameters != null && parameters.filter != null) {
- jobDataFilter = Pattern.compile(parameters.filter);
+ if (parameters != null && !Strings.isNullOrEmpty(parameters.filter)) {
+ filter = new RegexpFilter(parameters.filter);
+ } else if (parameters != null && parameters.pmFilter != null) {
+ filter = new PmReportFilter(parameters.pmFilter);
} else {
- jobDataFilter = null;
+ filter = null;
}
this.consumerRestClient = consumerRestClient;
+
}
- public boolean isFilterMatch(String data) {
- if (jobDataFilter == null) {
- return true;
+ public String filter(String data) {
+ if (filter == null) {
+ return data;
}
- Matcher matcher = jobDataFilter.matcher(data);
- return matcher.find();
+ return filter.filter(data);
}
public boolean isBuffered() {
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+public interface Filter {
+ public String filter(String data);
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.immutables.gson.Gson;
+
+@Gson.TypeAdapters
+class PmReport {
+
+ Event event = new Event();
+
+ @Gson.TypeAdapters
+ public static class CommonEventHeader {
+ String domain;
+ String eventId;
+ int sequence;
+ String eventName;
+ String sourceName;
+ String reportingEntityName;
+ String priority;
+ long startEpochMicrosec;
+ long lastEpochMicrosec;
+ String version;
+ String vesEventListenerVersion;
+ String timeZoneOffset;
+ }
+
+ @Gson.TypeAdapters
+ public static class MeasInfoId {
+ String sMeasInfoId;
+ }
+
+ @Gson.TypeAdapters
+ public static class MeasTypes {
+ public String getMeasType(int pValue) {
+ if (pValue > sMeasTypesList.size()) {
+ return "MeasTypeIndexOutOfBounds:" + pValue;
+ }
+ return sMeasTypesList.get(pValue - 1);
+ }
+
+ protected ArrayList<String> sMeasTypesList = new ArrayList<>();
+ }
+
+ @Gson.TypeAdapters
+ public static class MeasResult {
+ int p;
+ String sValue;
+ }
+
+ @Gson.TypeAdapters
+ public static class MeasValuesList {
+ String measObjInstId;
+ String suspectFlag;
+ Collection<MeasResult> measResults = new ArrayList<>();
+
+ public MeasValuesList shallowClone() {
+ MeasValuesList n = new MeasValuesList();
+ n.measObjInstId = this.measObjInstId;
+ n.suspectFlag = this.suspectFlag;
+ return n;
+ }
+ }
+
+ @Gson.TypeAdapters
+ public static class MeasInfoList {
+ MeasInfoId measInfoId;
+ MeasTypes measTypes;
+ Collection<MeasValuesList> measValuesList = new ArrayList<>();
+
+ public MeasInfoList shallowClone() {
+ MeasInfoList n = new MeasInfoList();
+ n.measInfoId = this.measInfoId;
+ n.measTypes = new MeasTypes();
+ return n;
+ }
+ }
+
+ @Gson.TypeAdapters
+ public static class MeasDataCollection {
+ int granularityPeriod;
+ String measuredEntityUserName;
+ String measuredEntityDn;
+ String measuredEntitySoftwareVersion;
+ Collection<MeasInfoList> measInfoList = new ArrayList<>();
+ }
+
+ @Gson.TypeAdapters
+ public static class Perf3gppFields {
+ String perf3gppFieldsVersion;
+ MeasDataCollection measDataCollection;
+ }
+
+ @Gson.TypeAdapters
+ public static class Event {
+ CommonEventHeader commonEventHeader;
+ Perf3gppFields perf3gppFields;
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Getter;
+
+import org.immutables.gson.Gson;
+import org.thymeleaf.util.StringUtils;
+
+public class PmReportFilter implements Filter {
+
+ private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private final FilterData filterData;
+
+ @Gson.TypeAdapters
+ @Getter
+ public static class FilterData {
+ Collection<String> sourceNames = new ArrayList<>();
+ Collection<String> measObjInstIds = new ArrayList<>();
+ Collection<String> measTypes = new ArrayList<>();
+ Collection<String> measuredEntityDns = new ArrayList<>();
+ }
+
+ private static class MeasTypesIndexed extends PmReport.MeasTypes {
+ private Map<String, Integer> map = new HashMap<>();
+
+ public int addP(String measTypeName) {
+ Integer p = map.get(measTypeName);
+ if (p != null) {
+ return p;
+ } else {
+ this.sMeasTypesList.add(measTypeName);
+ this.map.put(measTypeName, this.sMeasTypesList.size());
+ return this.sMeasTypesList.size();
+ }
+ }
+ }
+
+ public PmReportFilter(FilterData filterData) {
+ this.filterData = filterData;
+ }
+
+ @Override
+ public String filter(String data) {
+ PmReport report = gson.fromJson(data, PmReport.class);
+ if (!filter(report, this.filterData)) {
+ return "";
+ }
+ return gson.toJson(report);
+
+ }
+
+ /**
+ * Updates the report based on the filter data.
+ *
+ * @param report
+ * @param filterData
+ * @return true if there is anything left in the report
+ */
+ private boolean filter(PmReport report, FilterData filterData) {
+ if (!matchSourceNames(report, filterData.sourceNames)) {
+ return false;
+ }
+ Collection<PmReport.MeasInfoList> filtered = createMeasObjInstIds(report, filterData);
+ report.event.perf3gppFields.measDataCollection.measInfoList = filtered;
+ return !filtered.isEmpty();
+ }
+
+ private boolean isContainedInAny(String aString, Collection<String> collection) {
+ for (String s : collection) {
+ if (StringUtils.contains(aString, s) == Boolean.TRUE) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isMeasResultMatch(PmReport.MeasResult measResult, PmReport.MeasTypes measTypes, FilterData filter) {
+ String measType = measTypes.getMeasType(measResult.p);
+ return filter.measTypes.isEmpty() || filter.measTypes.contains(measType);
+ }
+
+ private Collection<PmReport.MeasResult> createMeasResults(Collection<PmReport.MeasResult> oldMeasResults,
+ PmReport.MeasTypes measTypes, FilterData filter) {
+ Collection<PmReport.MeasResult> newMeasResults = new ArrayList<>();
+
+ for (PmReport.MeasResult measResult : oldMeasResults) {
+ if (isMeasResultMatch(measResult, measTypes, filter)) {
+ newMeasResults.add(measResult);
+ }
+ }
+ return newMeasResults;
+ }
+
+ private PmReport.MeasValuesList createMeasValuesList(PmReport.MeasValuesList oldMeasValues,
+ PmReport.MeasTypes measTypes, FilterData filter) {
+
+ PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone();
+
+ if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) {
+ newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter);
+ }
+ return newMeasValuesList;
+ }
+
+ private PmReport.MeasTypes createMeasTypes(Collection<PmReport.MeasValuesList> measValues,
+ PmReport.MeasTypes oldMMeasTypes) {
+ MeasTypesIndexed newMeasTypes = new MeasTypesIndexed();
+ for (PmReport.MeasValuesList l : measValues) {
+ for (PmReport.MeasResult r : l.measResults) {
+ String measTypeName = oldMMeasTypes.getMeasType(r.p);
+ r.p = newMeasTypes.addP(measTypeName);
+ }
+ }
+ return newMeasTypes;
+ }
+
+ private PmReport.MeasInfoList createMeasInfoList(PmReport.MeasInfoList oldMeasInfoList, FilterData filter) {
+ PmReport.MeasInfoList newMeasInfoList = oldMeasInfoList.shallowClone();
+
+ for (PmReport.MeasValuesList oldValues : oldMeasInfoList.measValuesList) {
+ PmReport.MeasValuesList newMeasValues = createMeasValuesList(oldValues, oldMeasInfoList.measTypes, filter);
+ if (!newMeasValues.measResults.isEmpty()) {
+ newMeasInfoList.measValuesList.add(newMeasValues);
+ }
+ }
+ newMeasInfoList.measTypes = createMeasTypes(newMeasInfoList.measValuesList, oldMeasInfoList.measTypes);
+ return newMeasInfoList;
+ }
+
+ private boolean matchMeasuredEntityDns(PmReport report, FilterData filter) {
+ return filter.measuredEntityDns.isEmpty() || this.isContainedInAny(
+ report.event.perf3gppFields.measDataCollection.measuredEntityDn, filter.measuredEntityDns);
+ }
+
+ private Collection<PmReport.MeasInfoList> createMeasObjInstIds(PmReport report, FilterData filter) {
+ Collection<PmReport.MeasInfoList> newList = new ArrayList<>();
+ if (!matchMeasuredEntityDns(report, filter)) {
+ return newList;
+ }
+ for (PmReport.MeasInfoList oldMeasInfoList : report.event.perf3gppFields.measDataCollection.measInfoList) {
+ PmReport.MeasInfoList l = createMeasInfoList(oldMeasInfoList, filter);
+ if (!l.measValuesList.isEmpty()) {
+ newList.add(l);
+ }
+ }
+ return newList;
+ }
+
+ private boolean matchSourceNames(PmReport report, Collection<String> sourceNames) {
+ return sourceNames.isEmpty() || sourceNames.contains(report.event.commonEventHeader.sourceName);
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class RegexpFilter implements Filter {
+
+ private final Pattern regexp;
+
+ public RegexpFilter(String exp) {
+ if (exp != null) {
+ regexp = Pattern.compile(exp);
+ } else {
+ regexp = null;
+ }
+ }
+
+ public String filter(String data) {
+
+ if (regexp == null) {
+ return data;
+ }
+ Matcher matcher = regexp.matcher(data);
+ boolean match = matcher.find();
+ if (match) {
+ return data;
+ } else {
+ return "";
+ }
+ }
+
+}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.function.Tuples;
/**
* The class fetches incoming requests from DMAAP and sends them further to the
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
- .filter(job -> job.isFilterMatch(body)) //
- .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
- .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
+ .map(job -> Tuples.of(job, job.filter(body))) //
+ .filter(t -> !t.getT2().isEmpty()) //
+ .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) //
+ .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(),
+ MediaType.APPLICATION_JSON), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
}
}
private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
- Flux<String> result = input.filter(job::isFilterMatch);
+ Flux<String> result = input.map(job::filter) //
+ .filter(t -> !t.isEmpty()); //
if (job.isBuffered()) {
result = result.map(this::quote) //
}
private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
- String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+ String schemaFile;
+ if (type.getDataType() == InfoType.DataType.PM_DATA) {
+ schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaPmDataKafka.json" : "/typeSchemaPmDataDmaap.json";
+ } else {
+ schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+ }
return jsonObject(readSchemaFile(schemaFile));
}
--- /dev/null
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "pmFilter": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "sourceNames": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measObjInstIds": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measuredEntityDns": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "pmFilter": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "sourceNames": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measObjInstIds": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measuredEntityDns": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ }
+ }
+ },
+ "maxConcurrency": {
+ "type": "integer",
+ "minimum": 1
+ },
+ "bufferTimeout": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "maxSize": {
+ "type": "integer",
+ "minimum": 1
+ },
+ "maxTimeMiliseconds": {
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 160000
+ }
+ },
+ "required": [
+ "maxSize",
+ "maxTimeMiliseconds"
+ ]
+ }
+ }
+}
\ No newline at end of file
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.nio.charset.Charset;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import org.json.JSONObject;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.filters.PmReportFilter;
import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
}
+ private ConsumerJobInfo consumerPmJobInfo() {
+ return consumerJobInfo("PmInformationType", "EI_PM_JOB_ID");
+ }
+
private Object jsonObject() {
return jsonObject("{}");
}
}
@Test
- void testReceiveAndPostDataFromKafka() {
+ void testReceiveAndPostDataFromKafka() throws Exception {
final String JOB_ID = "ID";
final String TYPE_ID = "KafkaInformationType";
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
- Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+ Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1, null);
String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
ConsumerJobInfo kafkaJobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
+ @Test
+ void testPmFiltering() throws Exception {
+ // Create a job
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ producerRegistrationTask.supervisionTask().block();
+
+ // Create a job with a PM filter
+ ConsumerJobInfo jobInfo = consumerPmJobInfo();
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
+ filterData.getSourceNames().add("O-DU-1122");
+ filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
+ Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+ String paramJson = gson.toJson(param);
+ jobInfo.jobDefinition = jsonObject(paramJson);
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Return one messagefrom DMAAP and verify that the job (consumer) receives a
+ // filtered PM message
+ String path = "./src/test/resources/pm_report.json";
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+ DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ String receivedFiltered = consumer.receivedBodies.get(0);
+ assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ @Test
+ void testPmFilteringKafka() throws Exception {
+ // Test that the schema for kafka and pm filtering is OK.
+
+ // Create a job
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ producerRegistrationTask.supervisionTask().block();
+
+ // Create a job with a PM filter
+ ConsumerJobInfo jobInfo = consumerPmJobInfo();
+ jobInfo.infoTypeId = "PmInformationTypeKafka";
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("succImmediateAssignProcs");
+ Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+ String paramJson = gson.toJson(param);
+ jobInfo.jobDefinition = jsonObject(paramJson);
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
// Just clear the registerred types, should trigger a re-register
icsSimulatorController.testResults.types.clear();
- await().untilAsserted(
- () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds)
+ .hasSize(this.types.size()));
}
public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1";
+ public static final String DMAAP_TOPIC_PM_URL = "/dmaap-topic-2";
public static List<String> dmaapResponses = Collections.synchronizedList(new LinkedList<String>());
+ public static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
+
@GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "GET from topic",
description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.")
}
+ @GetMapping(path = DMAAP_TOPIC_PM_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "GET from topic",
+ description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.")
+ @ApiResponses(value = { //
+ @ApiResponse(responseCode = "200", description = "OK", //
+ content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
+ })
+ public ResponseEntity<Object> getFromPmTopic() {
+ if (dmaapPmResponses.isEmpty()) {
+ return ErrorResponse.create("", HttpStatus.NOT_FOUND);
+ } else {
+ String resp = dmaapPmResponses.remove(0);
+ return new ResponseEntity<>(resp, HttpStatus.OK);
+ }
+
+ }
+
}
package org.oran.dmaapadapter;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.HashMap;
import java.util.Map;
+import org.json.JSONObject;
import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
return new ResponseEntity<>(HttpStatus.OK);
}
- public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) {
+ public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) throws ServiceException {
String url = this.testResults.registrationInfo.jobCallbackUrl;
ProducerJobInfo request =
new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
String body = gson.toJson(request);
+ validateJsonObjectAgainstSchema(job.jobDefinition, testResults.types.get(job.infoTypeId).jobDataSchema);
logger.info("ICS Simulator PUT job: {}", body);
restClient.post(url, body, MediaType.APPLICATION_JSON).block();
}
+ private void validateJsonObjectAgainstSchema(Object object, Object schemaObj) throws ServiceException {
+ if (schemaObj != null) { // schema is optional for now
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ String schemaAsString = mapper.writeValueAsString(schemaObj);
+ JSONObject schemaJSON = new JSONObject(schemaAsString);
+ var schema = org.everit.json.schema.loader.SchemaLoader.load(schemaJSON);
+
+ String objectAsString = object.toString();
+ JSONObject json = new JSONObject(objectAsString);
+ schema.validate(json);
+ } catch (Exception e) {
+ logger.error("Json validation failure {}", e.toString());
+ throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.BAD_REQUEST);
+ }
+ }
+ }
+
public void deleteJob(String jobId, AsyncRestClient restClient) {
String url = this.testResults.registrationInfo.jobCallbackUrl + "/" + jobId;
logger.info("ICS Simulator DELETE job: {}", url);
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1, null);
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1);
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1, null);
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
int maxConcurrency) {
Job.Parameters param =
- new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
+ new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency, null);
String str = gson.toJson(param);
return jsonObject(str);
}
}
@Test
- void simpleCase() throws InterruptedException {
+ void simpleCase() throws Exception {
final String JOB_ID = "ID";
// Register producer, Register types
}
@Test
- void kafkaIOverflow() throws InterruptedException {
+ void kafkaIOverflow() throws Exception {
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+class PmReportFilterTest {
+
+ @Test
+ void testPmFilterMeasTypes() throws Exception {
+
+ String reportJson = loadReport();
+
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.measTypes.add("succImmediateAssignProcs");
+
+ PmReportFilter filter = new PmReportFilter(filterData);
+ String filtered = filter.filter(reportJson);
+
+ assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1")
+ .contains("Gbg-997");
+
+ // Test that no report is returned if not meas types were found
+ filterData = new PmReportFilter.FilterData();
+ filterData.measTypes.add("junk");
+ filter = new PmReportFilter(filterData);
+ filtered = filter.filter(reportJson);
+ assertThat(filtered).isEmpty();
+ }
+
+ @Test
+ void testMeasObjInstIds() throws Exception {
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.measObjInstIds.add("junk");
+ PmReportFilter filter = new PmReportFilter(filterData);
+ String filtered = filter.filter(loadReport());
+ assertThat(filtered).isEmpty();
+
+ filterData = new PmReportFilter.FilterData();
+ filterData.measObjInstIds.add("UtranCell=Gbg-997");
+ filter = new PmReportFilter(filterData);
+ filtered = filter.filter(loadReport());
+ assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998");
+ }
+
+ @Test
+ void testSourceNames() throws Exception {
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.sourceNames.add("junk");
+ PmReportFilter filter = new PmReportFilter(filterData);
+ String filtered = filter.filter(loadReport());
+ assertThat(filtered).isEmpty();
+
+ filterData = new PmReportFilter.FilterData();
+ filterData.sourceNames.add("O-DU-1122");
+ filter = new PmReportFilter(filterData);
+ filtered = filter.filter(loadReport());
+ assertThat(filtered).contains("O-DU-1122");
+ }
+
+ @Test
+ void testMeasuredEntityDns() throws Exception {
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.measuredEntityDns.add("junk");
+ PmReportFilter filter = new PmReportFilter(filterData);
+ String filtered = filter.filter(loadReport());
+ assertThat(filtered).isEmpty();
+
+ filterData = new PmReportFilter.FilterData();
+ filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1");
+ filter = new PmReportFilter(filterData);
+ filtered = filter.filter(loadReport());
+ assertThat(filtered).contains("RNC-Gbg-1"); // '=' is escaped to unicode by gson. OK
+ }
+
+ @Test
+ void testParse() throws Exception {
+ com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ PmReport report = gson.fromJson(loadReport(), PmReport.class);
+
+ String dn = report.event.perf3gppFields.measDataCollection.measuredEntityDn;
+ String json = gson.toJson(report);
+ report = gson.fromJson(json, PmReport.class);
+
+ // '=' is escaped to unicode by gson. but converted back
+ assertThat(report.event.perf3gppFields.measDataCollection.measuredEntityDn).isEqualTo(dn);
+ }
+
+ private String loadReport() throws Exception {
+ String path = "./src/test/resources/pm_report.json";
+ return Files.readString(Path.of(path), Charset.defaultCharset());
+ }
+
+}
--- /dev/null
+{
+ "event": {
+ "commonEventHeader": {
+ "domain": "perf3gpp",
+ "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882",
+ "sequence": 0,
+ "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult",
+ "sourceName": "O-DU-1122",
+ "reportingEntityName": "",
+ "priority": "Normal",
+ "startEpochMicrosec": 951912000000,
+ "lastEpochMicrosec": 951912900000,
+ "version": "4.0",
+ "vesEventListenerVersion": "7.1",
+ "timeZoneOffset": "+00:00"
+ },
+ "perf3gppFields": {
+ "perf3gppFieldsVersion": "1.0",
+ "measDataCollection": {
+ "granularityPeriod": 900,
+ "measuredEntityUserName": "RNC Telecomville",
+ "measuredEntityDn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
+ "measuredEntitySoftwareVersion": "",
+ "measInfoList": [
+ {
+ "measInfoId": {
+ "sMeasInfoId": ""
+ },
+ "measTypes": {
+ "sMeasTypesList": [
+ "attTCHSeizures",
+ "succTCHSeizures",
+ "attImmediateAssignProcs",
+ "succImmediateAssignProcs"
+ ]
+ },
+ "measValuesList": [
+ {
+ "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-997",
+ "suspectFlag": "false",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "813"
+ },
+ {
+ "p": 2,
+ "sValue": "913"
+ },
+ {
+ "p": 3,
+ "sValue": "1013"
+ },
+ {
+ "p": 4,
+ "sValue": "1113"
+ }
+ ]
+ },
+ {
+ "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-998",
+ "suspectFlag": "false",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "890"
+ },
+ {
+ "p": 2,
+ "sValue": "901"
+ },
+ {
+ "p": 3,
+ "sValue": "123"
+ },
+ {
+ "p": 4,
+ "sValue": "234"
+ }
+ ]
+ },
+ {
+ "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-999",
+ "suspectFlag": "true",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "456"
+ },
+ {
+ "p": 2,
+ "sValue": "567"
+ },
+ {
+ "p": 3,
+ "sValue": "678"
+ },
+ {
+ "p": 4,
+ "sValue": "789"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "measInfoId": {
+ "sMeasInfoId": "ENodeBFunction"
+ },
+ "measTypes": {
+ "sMeasTypesList": [
+ "attTCHSeizures1",
+ "succTCHSeizures2",
+ "attImmediateAssignProcs3",
+ "succImmediateAssignProcs4"
+ ]
+ },
+ "measValuesList": [
+ {
+ "measObjInstId": "ManagedElement=RNC-Gbg-1,ENodeBFunction=1",
+ "suspectFlag": "false",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "4"
+ },
+ {
+ "p": 2,
+ "sValue": "86,87,2,6,77,96,75,33,24"
+ },
+ {
+ "p": 3,
+ "sValue": "40"
+ },
+ {
+ "p": 4,
+ "sValue": "90"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "measInfoId": {
+ "sMeasInfoId": ""
+ },
+ "measTypes": {
+ "sMeasTypesList": [
+ "attTCHSeizures5",
+ "succTCHSeizures6",
+ "attImmediateAssignProcs7",
+ "succImmediateAssignProcs8"
+ ]
+ },
+ "measValuesList": [
+ {
+ "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-997",
+ "suspectFlag": "false",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "238"
+ },
+ {
+ "p": 2,
+ "sValue": "344"
+ },
+ {
+ "p": 3,
+ "sValue": "563"
+ },
+ {
+ "p": 4,
+ "sValue": "787"
+ }
+ ]
+ },
+ {
+ "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-998",
+ "suspectFlag": "false",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "898"
+ },
+ {
+ "p": 2,
+ "sValue": "905"
+ },
+ {
+ "p": 3,
+ "sValue": "127"
+ },
+ {
+ "p": 4,
+ "sValue": "238"
+ }
+ ]
+ },
+ {
+ "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-999",
+ "suspectFlag": "true",
+ "measResults": [
+ {
+ "p": 1,
+ "sValue": "454"
+ },
+ {
+ "p": 2,
+ "sValue": "569"
+ },
+ {
+ "p": 3,
+ "sValue": "672"
+ },
+ {
+ "p": 4,
+ "sValue": "785"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
+}
\ No newline at end of file
{
"id": "DmaapInformationType",
"dmaapTopicUrl": "/dmaap-topic-1",
- "useHttpProxy": false
+ "useHttpProxy": false,
+ "dataType": "text"
},
{
"id": "KafkaInformationType",
"kafkaInputTopic": "TutorialTopic",
- "useHttpProxy": false
+ "useHttpProxy": false
+ },
+ {
+ "id": "PmInformationType",
+ "dmaapTopicUrl": "/dmaap-topic-2",
+ "useHttpProxy": false,
+ "dataType": "PmData"
+ },
+ {
+ "id": "PmInformationTypeKafka",
+ "kafkaInputTopic": "TutorialTopic",
+ "useHttpProxy": false,
+ "dataType": "PmData"
}
]
-}
\ No newline at end of file
+}