From: PatrikBuhr Date: Fri, 8 Apr 2022 11:13:57 +0000 (+0200) Subject: PM Filter X-Git-Tag: 1.1.0~12^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=1609ddc3e3282e32ca94af11262fe9f9af3af314;p=nonrtric%2Fplt%2Fdmaapadapter.git PM Filter Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-743 Change-Id: I068887f91865df92d014052d9ecfdf38831938a7 --- diff --git a/api/api.json b/api/api.json index 88fed46..a58aced 100644 --- a/api/api.json +++ b/api/api.json @@ -340,6 +340,16 @@ }}, "tags": ["Actuator"] }}, + "/dmaap-topic-2": {"get": { + "summary": "GET from topic", + "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.", + "operationId": "getFromPmTopic", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}} + }}, + "tags": ["DMAAP Simulator (exists only in test)"] + }}, "/consumer": {"post": { "summary": "Consume data", "requestBody": { diff --git a/api/api.yaml b/api/api.yaml index f6eb1f7..02697ee 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -334,6 +334,21 @@ paths: '*/*': schema: type: object + /dmaap-topic-2: + get: + tags: + - DMAAP Simulator (exists only in test) + summary: GET from topic + description: The call is invoked to activate or to modify a data subscription. + The endpoint is provided by the Information Producer. + operationId: getFromPmTopic + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/void' /consumer: post: tags: diff --git a/config/README b/config/README index 140927f..a2137b5 100644 --- a/config/README +++ b/config/README @@ -26,7 +26,7 @@ keytool -list -v -keystore truststore.jks -storepass policy_agent ## License -Copyright (C) 2020 Nordix Foundation. All rights reserved. +Copyright (C) 2022 Nordix Foundation. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/pom.xml b/pom.xml index 18fde28..53515c3 100644 --- a/pom.xml +++ b/pom.xml @@ -141,6 +141,13 @@ true + + + com.github.erosb + everit-json-schema + 1.12.1 + test + org.springframework.boot spring-boot-starter-test @@ -361,4 +368,4 @@ JIRA https://jira.o-ran-sc.org/ - + \ No newline at end of file diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index 3ea64e7..7b97486 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -124,7 +124,6 @@ public class ApplicationConfig { public Collection getTypes() { com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); - try { String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset()); ConfigFile configData = gson.fromJson(configJson, ConfigFile.class); diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index 27b527d..3a78d6c 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -38,11 +38,14 @@ public class InfoType { @Getter private final String kafkaInputTopic; - public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) { + private final String dataType; + + public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType) { this.id = id; this.dmaapTopicUrl = dmaapTopicUrl; this.useHttpProxy = useHttpProxy; this.kafkaInputTopic = kafkaInputTopic; + this.dataType = dataType; } public boolean isKafkaTopicDefined() { @@ -53,4 +56,14 @@ public class InfoType { return StringUtils.hasLength(dmaapTopicUrl); } + public enum DataType { + PM_DATA, TEXT + } + + public DataType getDataType() { + if (dataType != null && dataType.equalsIgnoreCase("pmData")) { + return DataType.PM_DATA; + } + return DataType.TEXT; + } } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 5f7521c..5a2e373 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -20,14 +20,17 @@ package org.oran.dmaapadapter.repository; +import com.google.common.base.Strings; + import java.time.Duration; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import lombok.Getter; import org.immutables.gson.Gson; import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.repository.filters.Filter; +import org.oran.dmaapadapter.repository.filters.PmReportFilter; +import org.oran.dmaapadapter.repository.filters.RegexpFilter; public class Job { @@ -38,18 +41,23 @@ public class Job { @Getter private BufferTimeout bufferTimeout; - private int maxConcurrency; + private Integer maxConcurrency; + + @Getter + private PmReportFilter.FilterData pmFilter; public Parameters() {} - public Parameters(String filter, BufferTimeout bufferTimeout, int maxConcurrency) { + public Parameters(String filter, BufferTimeout bufferTimeout, Integer maxConcurrency, + PmReportFilter.FilterData pmFilter) { this.filter = filter; this.bufferTimeout = bufferTimeout; this.maxConcurrency = maxConcurrency; + this.pmFilter = pmFilter; } public int getMaxConcurrency() { - return maxConcurrency == 0 ? 1 : maxConcurrency; + return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency; } } @@ -90,7 +98,7 @@ public class Job { @Getter private final String lastUpdated; - private final Pattern jobDataFilter; + private final Filter filter; @Getter private final AsyncRestClient consumerRestClient; @@ -103,20 +111,22 @@ public class Job { this.owner = owner; this.lastUpdated = lastUpdated; this.parameters = parameters; - if (parameters != null && parameters.filter != null) { - jobDataFilter = Pattern.compile(parameters.filter); + if (parameters != null && !Strings.isNullOrEmpty(parameters.filter)) { + filter = new RegexpFilter(parameters.filter); + } else if (parameters != null && parameters.pmFilter != null) { + filter = new PmReportFilter(parameters.pmFilter); } else { - jobDataFilter = null; + filter = null; } this.consumerRestClient = consumerRestClient; + } - public boolean isFilterMatch(String data) { - if (jobDataFilter == null) { - return true; + public String filter(String data) { + if (filter == null) { + return data; } - Matcher matcher = jobDataFilter.matcher(data); - return matcher.find(); + return filter.filter(data); } public boolean isBuffered() { diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java new file mode 100644 index 0000000..e878387 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java @@ -0,0 +1,26 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +public interface Filter { + public String filter(String data); + +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java new file mode 100644 index 0000000..6f3300d --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java @@ -0,0 +1,121 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import java.util.ArrayList; +import java.util.Collection; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +class PmReport { + + Event event = new Event(); + + @Gson.TypeAdapters + public static class CommonEventHeader { + String domain; + String eventId; + int sequence; + String eventName; + String sourceName; + String reportingEntityName; + String priority; + long startEpochMicrosec; + long lastEpochMicrosec; + String version; + String vesEventListenerVersion; + String timeZoneOffset; + } + + @Gson.TypeAdapters + public static class MeasInfoId { + String sMeasInfoId; + } + + @Gson.TypeAdapters + public static class MeasTypes { + public String getMeasType(int pValue) { + if (pValue > sMeasTypesList.size()) { + return "MeasTypeIndexOutOfBounds:" + pValue; + } + return sMeasTypesList.get(pValue - 1); + } + + protected ArrayList sMeasTypesList = new ArrayList<>(); + } + + @Gson.TypeAdapters + public static class MeasResult { + int p; + String sValue; + } + + @Gson.TypeAdapters + public static class MeasValuesList { + String measObjInstId; + String suspectFlag; + Collection measResults = new ArrayList<>(); + + public MeasValuesList shallowClone() { + MeasValuesList n = new MeasValuesList(); + n.measObjInstId = this.measObjInstId; + n.suspectFlag = this.suspectFlag; + return n; + } + } + + @Gson.TypeAdapters + public static class MeasInfoList { + MeasInfoId measInfoId; + MeasTypes measTypes; + Collection measValuesList = new ArrayList<>(); + + public MeasInfoList shallowClone() { + MeasInfoList n = new MeasInfoList(); + n.measInfoId = this.measInfoId; + n.measTypes = new MeasTypes(); + return n; + } + } + + @Gson.TypeAdapters + public static class MeasDataCollection { + int granularityPeriod; + String measuredEntityUserName; + String measuredEntityDn; + String measuredEntitySoftwareVersion; + Collection measInfoList = new ArrayList<>(); + } + + @Gson.TypeAdapters + public static class Perf3gppFields { + String perf3gppFieldsVersion; + MeasDataCollection measDataCollection; + } + + @Gson.TypeAdapters + public static class Event { + CommonEventHeader commonEventHeader; + Perf3gppFields perf3gppFields; + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java new file mode 100644 index 0000000..9565d90 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java @@ -0,0 +1,177 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import lombok.Getter; + +import org.immutables.gson.Gson; +import org.thymeleaf.util.StringUtils; + +public class PmReportFilter implements Filter { + + private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + private final FilterData filterData; + + @Gson.TypeAdapters + @Getter + public static class FilterData { + Collection sourceNames = new ArrayList<>(); + Collection measObjInstIds = new ArrayList<>(); + Collection measTypes = new ArrayList<>(); + Collection measuredEntityDns = new ArrayList<>(); + } + + private static class MeasTypesIndexed extends PmReport.MeasTypes { + private Map map = new HashMap<>(); + + public int addP(String measTypeName) { + Integer p = map.get(measTypeName); + if (p != null) { + return p; + } else { + this.sMeasTypesList.add(measTypeName); + this.map.put(measTypeName, this.sMeasTypesList.size()); + return this.sMeasTypesList.size(); + } + } + } + + public PmReportFilter(FilterData filterData) { + this.filterData = filterData; + } + + @Override + public String filter(String data) { + PmReport report = gson.fromJson(data, PmReport.class); + if (!filter(report, this.filterData)) { + return ""; + } + return gson.toJson(report); + + } + + /** + * Updates the report based on the filter data. + * + * @param report + * @param filterData + * @return true if there is anything left in the report + */ + private boolean filter(PmReport report, FilterData filterData) { + if (!matchSourceNames(report, filterData.sourceNames)) { + return false; + } + Collection filtered = createMeasObjInstIds(report, filterData); + report.event.perf3gppFields.measDataCollection.measInfoList = filtered; + return !filtered.isEmpty(); + } + + private boolean isContainedInAny(String aString, Collection collection) { + for (String s : collection) { + if (StringUtils.contains(aString, s) == Boolean.TRUE) { + return true; + } + } + return false; + } + + private boolean isMeasResultMatch(PmReport.MeasResult measResult, PmReport.MeasTypes measTypes, FilterData filter) { + String measType = measTypes.getMeasType(measResult.p); + return filter.measTypes.isEmpty() || filter.measTypes.contains(measType); + } + + private Collection createMeasResults(Collection oldMeasResults, + PmReport.MeasTypes measTypes, FilterData filter) { + Collection newMeasResults = new ArrayList<>(); + + for (PmReport.MeasResult measResult : oldMeasResults) { + if (isMeasResultMatch(measResult, measTypes, filter)) { + newMeasResults.add(measResult); + } + } + return newMeasResults; + } + + private PmReport.MeasValuesList createMeasValuesList(PmReport.MeasValuesList oldMeasValues, + PmReport.MeasTypes measTypes, FilterData filter) { + + PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone(); + + if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) { + newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter); + } + return newMeasValuesList; + } + + private PmReport.MeasTypes createMeasTypes(Collection measValues, + PmReport.MeasTypes oldMMeasTypes) { + MeasTypesIndexed newMeasTypes = new MeasTypesIndexed(); + for (PmReport.MeasValuesList l : measValues) { + for (PmReport.MeasResult r : l.measResults) { + String measTypeName = oldMMeasTypes.getMeasType(r.p); + r.p = newMeasTypes.addP(measTypeName); + } + } + return newMeasTypes; + } + + private PmReport.MeasInfoList createMeasInfoList(PmReport.MeasInfoList oldMeasInfoList, FilterData filter) { + PmReport.MeasInfoList newMeasInfoList = oldMeasInfoList.shallowClone(); + + for (PmReport.MeasValuesList oldValues : oldMeasInfoList.measValuesList) { + PmReport.MeasValuesList newMeasValues = createMeasValuesList(oldValues, oldMeasInfoList.measTypes, filter); + if (!newMeasValues.measResults.isEmpty()) { + newMeasInfoList.measValuesList.add(newMeasValues); + } + } + newMeasInfoList.measTypes = createMeasTypes(newMeasInfoList.measValuesList, oldMeasInfoList.measTypes); + return newMeasInfoList; + } + + private boolean matchMeasuredEntityDns(PmReport report, FilterData filter) { + return filter.measuredEntityDns.isEmpty() || this.isContainedInAny( + report.event.perf3gppFields.measDataCollection.measuredEntityDn, filter.measuredEntityDns); + } + + private Collection createMeasObjInstIds(PmReport report, FilterData filter) { + Collection newList = new ArrayList<>(); + if (!matchMeasuredEntityDns(report, filter)) { + return newList; + } + for (PmReport.MeasInfoList oldMeasInfoList : report.event.perf3gppFields.measDataCollection.measInfoList) { + PmReport.MeasInfoList l = createMeasInfoList(oldMeasInfoList, filter); + if (!l.measValuesList.isEmpty()) { + newList.add(l); + } + } + return newList; + } + + private boolean matchSourceNames(PmReport report, Collection sourceNames) { + return sourceNames.isEmpty() || sourceNames.contains(report.event.commonEventHeader.sourceName); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java new file mode 100644 index 0000000..aac808c --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java @@ -0,0 +1,54 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class RegexpFilter implements Filter { + + private final Pattern regexp; + + public RegexpFilter(String exp) { + if (exp != null) { + regexp = Pattern.compile(exp); + } else { + regexp = null; + } + } + + public String filter(String data) { + + if (regexp == null) { + return data; + } + Matcher matcher = regexp.matcher(data); + boolean match = matcher.find(); + if (match) { + return data; + } else { + return ""; + } + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index fe7ec8b..dfa6d07 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -33,6 +33,7 @@ import org.springframework.http.MediaType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; /** * The class fetches incoming requests from DMAAP and sends them further to the @@ -99,9 +100,11 @@ public class DmaapTopicConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // - .filter(job -> job.isFilterMatch(body)) // - .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) // - .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) // + .map(job -> Tuples.of(job, job.filter(body))) // + .filter(t -> !t.getT2().isEmpty()) // + .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) // + .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(), + MediaType.APPLICATION_JSON), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index 2a16f47..e4675a5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -108,7 +108,8 @@ public class KafkaJobDataConsumer { } private Flux getMessagesFromKafka(Flux input, Job job) { - Flux result = input.filter(job::isFilterMatch); + Flux result = input.map(job::filter) // + .filter(t -> !t.isEmpty()); // if (job.isBuffered()) { result = result.map(this::quote) // diff --git a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index ec3f2b2..6623bb8 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -149,7 +149,12 @@ public class ProducerRegstrationTask { } private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException { - String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json"; + String schemaFile; + if (type.getDataType() == InfoType.DataType.PM_DATA) { + schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaPmDataKafka.json" : "/typeSchemaPmDataDmaap.json"; + } else { + schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json"; + } return jsonObject(readSchemaFile(schemaFile)); } diff --git a/src/main/resources/typeSchemaPmDataDmaap.json b/src/main/resources/typeSchemaPmDataDmaap.json new file mode 100644 index 0000000..8dd73e9 --- /dev/null +++ b/src/main/resources/typeSchemaPmDataDmaap.json @@ -0,0 +1,45 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "pmFilter": { + "type": "object", + "additionalProperties": false, + "properties": { + "sourceNames": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measObjInstIds": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measuredEntityDns": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/src/main/resources/typeSchemaPmDataKafka.json b/src/main/resources/typeSchemaPmDataKafka.json new file mode 100644 index 0000000..6690740 --- /dev/null +++ b/src/main/resources/typeSchemaPmDataKafka.json @@ -0,0 +1,68 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "pmFilter": { + "type": "object", + "additionalProperties": false, + "properties": { + "sourceNames": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measObjInstIds": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measuredEntityDns": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + } + } + }, + "maxConcurrency": { + "type": "integer", + "minimum": 1 + }, + "bufferTimeout": { + "type": "object", + "additionalProperties": false, + "properties": { + "maxSize": { + "type": "integer", + "minimum": 1 + }, + "maxTimeMiliseconds": { + "type": "integer", + "minimum": 0, + "maximum": 160000 + } + }, + "required": [ + "maxSize", + "maxTimeMiliseconds" + ] + } + } +} \ No newline at end of file diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 24603ec..2066645 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -29,7 +29,9 @@ import com.google.gson.JsonParser; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.Charset; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import org.json.JSONObject; @@ -50,6 +52,7 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.filters.PmReportFilter; import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer; import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; @@ -194,6 +197,10 @@ class ApplicationTest { return consumerJobInfo("DmaapInformationType", "EI_JOB_ID"); } + private ConsumerJobInfo consumerPmJobInfo() { + return consumerJobInfo("PmInformationType", "EI_PM_JOB_ID"); + } + private Object jsonObject() { return jsonObject("{}"); } @@ -248,14 +255,14 @@ class ApplicationTest { } @Test - void testReceiveAndPostDataFromKafka() { + void testReceiveAndPostDataFromKafka() throws Exception { final String JOB_ID = "ID"; final String TYPE_ID = "KafkaInformationType"; await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create a job - Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1); + Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1, null); String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); @@ -317,6 +324,72 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } + @Test + void testPmFiltering() throws Exception { + // Create a job + final String JOB_ID = "ID"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + producerRegistrationTask.supervisionTask().block(); + + // Create a job with a PM filter + ConsumerJobInfo jobInfo = consumerPmJobInfo(); + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); + filterData.getSourceNames().add("O-DU-1122"); + filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); + Job.Parameters param = new Job.Parameters(null, null, null, filterData); + String paramJson = gson.toJson(param); + jobInfo.jobDefinition = jsonObject(paramJson); + + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Return one messagefrom DMAAP and verify that the job (consumer) receives a + // filtered PM message + String path = "./src/test/resources/pm_report.json"; + String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); + DmaapSimulatorController.dmaapPmResponses.add(pmReportJson); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); + String receivedFiltered = consumer.receivedBodies.get(0); + assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1"); + + // Delete the job + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + + @Test + void testPmFilteringKafka() throws Exception { + // Test that the schema for kafka and pm filtering is OK. + + // Create a job + final String JOB_ID = "ID"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + producerRegistrationTask.supervisionTask().block(); + + // Create a job with a PM filter + ConsumerJobInfo jobInfo = consumerPmJobInfo(); + jobInfo.infoTypeId = "PmInformationTypeKafka"; + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("succImmediateAssignProcs"); + Job.Parameters param = new Job.Parameters(null, null, null, filterData); + String paramJson = gson.toJson(param); + jobInfo.jobDefinition = jsonObject(paramJson); + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Delete the job + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + @Test void testReRegister() throws Exception { // Wait foir register types and producer @@ -330,8 +403,8 @@ class ApplicationTest { // Just clear the registerred types, should trigger a re-register icsSimulatorController.testResults.types.clear(); - await().untilAsserted( - () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds) + .hasSize(this.types.size())); } public static void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { diff --git a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java index 5259ee1..db0bd22 100644 --- a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -49,9 +49,12 @@ public class DmaapSimulatorController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1"; + public static final String DMAAP_TOPIC_PM_URL = "/dmaap-topic-2"; public static List dmaapResponses = Collections.synchronizedList(new LinkedList()); + public static List dmaapPmResponses = Collections.synchronizedList(new LinkedList()); + @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "GET from topic", description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") @@ -70,4 +73,21 @@ public class DmaapSimulatorController { } + @GetMapping(path = DMAAP_TOPIC_PM_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "GET from topic", + description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity getFromPmTopic() { + if (dmaapPmResponses.isEmpty()) { + return ErrorResponse.create("", HttpStatus.NOT_FOUND); + } else { + String resp = dmaapPmResponses.remove(0); + return new ResponseEntity<>(resp, HttpStatus.OK); + } + + } + } diff --git a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java index 790aafb..286712e 100644 --- a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java @@ -20,6 +20,7 @@ package org.oran.dmaapadapter; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -29,7 +30,9 @@ import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; +import org.json.JSONObject; import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; import org.oran.dmaapadapter.r1.ProducerJobInfo; @@ -99,15 +102,35 @@ public class IcsSimulatorController { return new ResponseEntity<>(HttpStatus.OK); } - public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) { + public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) throws ServiceException { String url = this.testResults.registrationInfo.jobCallbackUrl; ProducerJobInfo request = new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP"); String body = gson.toJson(request); + validateJsonObjectAgainstSchema(job.jobDefinition, testResults.types.get(job.infoTypeId).jobDataSchema); logger.info("ICS Simulator PUT job: {}", body); restClient.post(url, body, MediaType.APPLICATION_JSON).block(); } + private void validateJsonObjectAgainstSchema(Object object, Object schemaObj) throws ServiceException { + if (schemaObj != null) { // schema is optional for now + try { + ObjectMapper mapper = new ObjectMapper(); + + String schemaAsString = mapper.writeValueAsString(schemaObj); + JSONObject schemaJSON = new JSONObject(schemaAsString); + var schema = org.everit.json.schema.loader.SchemaLoader.load(schemaJSON); + + String objectAsString = object.toString(); + JSONObject json = new JSONObject(objectAsString); + schema.validate(json); + } catch (Exception e) { + logger.error("Json validation failure {}", e.toString()); + throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.BAD_REQUEST); + } + } + } + public void deleteJob(String jobId, AsyncRestClient restClient) { String url = this.testResults.registrationInfo.jobCallbackUrl + "/" + jobId; logger.info("ICS Simulator DELETE job: {}", url); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 559b144..62d4158 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -214,7 +214,7 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); + Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1, null); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); @@ -233,7 +233,7 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1); + Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1, null); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index f637917..5f19633 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -186,7 +186,7 @@ class IntegrationWithKafka { private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { Job.Parameters param = - new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency); + new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency, null); String str = gson.toJson(param); return jsonObject(str); } @@ -254,7 +254,7 @@ class IntegrationWithKafka { } @Test - void simpleCase() throws InterruptedException { + void simpleCase() throws Exception { final String JOB_ID = "ID"; // Register producer, Register types @@ -307,7 +307,7 @@ class IntegrationWithKafka { } @Test - void kafkaIOverflow() throws InterruptedException { + void kafkaIOverflow() throws Exception { final String JOB_ID1 = "ID1"; final String JOB_ID2 = "ID2"; diff --git a/src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java new file mode 100644 index 0000000..53da055 --- /dev/null +++ b/src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java @@ -0,0 +1,118 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository.filters; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.jupiter.api.Test; + +class PmReportFilterTest { + + @Test + void testPmFilterMeasTypes() throws Exception { + + String reportJson = loadReport(); + + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.measTypes.add("succImmediateAssignProcs"); + + PmReportFilter filter = new PmReportFilter(filterData); + String filtered = filter.filter(reportJson); + + assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1") + .contains("Gbg-997"); + + // Test that no report is returned if not meas types were found + filterData = new PmReportFilter.FilterData(); + filterData.measTypes.add("junk"); + filter = new PmReportFilter(filterData); + filtered = filter.filter(reportJson); + assertThat(filtered).isEmpty(); + } + + @Test + void testMeasObjInstIds() throws Exception { + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.measObjInstIds.add("junk"); + PmReportFilter filter = new PmReportFilter(filterData); + String filtered = filter.filter(loadReport()); + assertThat(filtered).isEmpty(); + + filterData = new PmReportFilter.FilterData(); + filterData.measObjInstIds.add("UtranCell=Gbg-997"); + filter = new PmReportFilter(filterData); + filtered = filter.filter(loadReport()); + assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998"); + } + + @Test + void testSourceNames() throws Exception { + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.sourceNames.add("junk"); + PmReportFilter filter = new PmReportFilter(filterData); + String filtered = filter.filter(loadReport()); + assertThat(filtered).isEmpty(); + + filterData = new PmReportFilter.FilterData(); + filterData.sourceNames.add("O-DU-1122"); + filter = new PmReportFilter(filterData); + filtered = filter.filter(loadReport()); + assertThat(filtered).contains("O-DU-1122"); + } + + @Test + void testMeasuredEntityDns() throws Exception { + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.measuredEntityDns.add("junk"); + PmReportFilter filter = new PmReportFilter(filterData); + String filtered = filter.filter(loadReport()); + assertThat(filtered).isEmpty(); + + filterData = new PmReportFilter.FilterData(); + filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1"); + filter = new PmReportFilter(filterData); + filtered = filter.filter(loadReport()); + assertThat(filtered).contains("RNC-Gbg-1"); // '=' is escaped to unicode by gson. OK + } + + @Test + void testParse() throws Exception { + com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + PmReport report = gson.fromJson(loadReport(), PmReport.class); + + String dn = report.event.perf3gppFields.measDataCollection.measuredEntityDn; + String json = gson.toJson(report); + report = gson.fromJson(json, PmReport.class); + + // '=' is escaped to unicode by gson. but converted back + assertThat(report.event.perf3gppFields.measDataCollection.measuredEntityDn).isEqualTo(dn); + } + + private String loadReport() throws Exception { + String path = "./src/test/resources/pm_report.json"; + return Files.readString(Path.of(path), Charset.defaultCharset()); + } + +} diff --git a/src/test/resources/pm_report.json b/src/test/resources/pm_report.json new file mode 100644 index 0000000..1aa97c1 --- /dev/null +++ b/src/test/resources/pm_report.json @@ -0,0 +1,228 @@ +{ + "event": { + "commonEventHeader": { + "domain": "perf3gpp", + "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882", + "sequence": 0, + "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult", + "sourceName": "O-DU-1122", + "reportingEntityName": "", + "priority": "Normal", + "startEpochMicrosec": 951912000000, + "lastEpochMicrosec": 951912900000, + "version": "4.0", + "vesEventListenerVersion": "7.1", + "timeZoneOffset": "+00:00" + }, + "perf3gppFields": { + "perf3gppFieldsVersion": "1.0", + "measDataCollection": { + "granularityPeriod": 900, + "measuredEntityUserName": "RNC Telecomville", + "measuredEntityDn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1", + "measuredEntitySoftwareVersion": "", + "measInfoList": [ + { + "measInfoId": { + "sMeasInfoId": "" + }, + "measTypes": { + "sMeasTypesList": [ + "attTCHSeizures", + "succTCHSeizures", + "attImmediateAssignProcs", + "succImmediateAssignProcs" + ] + }, + "measValuesList": [ + { + "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-997", + "suspectFlag": "false", + "measResults": [ + { + "p": 1, + "sValue": "813" + }, + { + "p": 2, + "sValue": "913" + }, + { + "p": 3, + "sValue": "1013" + }, + { + "p": 4, + "sValue": "1113" + } + ] + }, + { + "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-998", + "suspectFlag": "false", + "measResults": [ + { + "p": 1, + "sValue": "890" + }, + { + "p": 2, + "sValue": "901" + }, + { + "p": 3, + "sValue": "123" + }, + { + "p": 4, + "sValue": "234" + } + ] + }, + { + "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-999", + "suspectFlag": "true", + "measResults": [ + { + "p": 1, + "sValue": "456" + }, + { + "p": 2, + "sValue": "567" + }, + { + "p": 3, + "sValue": "678" + }, + { + "p": 4, + "sValue": "789" + } + ] + } + ] + }, + { + "measInfoId": { + "sMeasInfoId": "ENodeBFunction" + }, + "measTypes": { + "sMeasTypesList": [ + "attTCHSeizures1", + "succTCHSeizures2", + "attImmediateAssignProcs3", + "succImmediateAssignProcs4" + ] + }, + "measValuesList": [ + { + "measObjInstId": "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", + "suspectFlag": "false", + "measResults": [ + { + "p": 1, + "sValue": "4" + }, + { + "p": 2, + "sValue": "86,87,2,6,77,96,75,33,24" + }, + { + "p": 3, + "sValue": "40" + }, + { + "p": 4, + "sValue": "90" + } + ] + } + ] + }, + { + "measInfoId": { + "sMeasInfoId": "" + }, + "measTypes": { + "sMeasTypesList": [ + "attTCHSeizures5", + "succTCHSeizures6", + "attImmediateAssignProcs7", + "succImmediateAssignProcs8" + ] + }, + "measValuesList": [ + { + "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-997", + "suspectFlag": "false", + "measResults": [ + { + "p": 1, + "sValue": "238" + }, + { + "p": 2, + "sValue": "344" + }, + { + "p": 3, + "sValue": "563" + }, + { + "p": 4, + "sValue": "787" + } + ] + }, + { + "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-998", + "suspectFlag": "false", + "measResults": [ + { + "p": 1, + "sValue": "898" + }, + { + "p": 2, + "sValue": "905" + }, + { + "p": 3, + "sValue": "127" + }, + { + "p": 4, + "sValue": "238" + } + ] + }, + { + "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-999", + "suspectFlag": "true", + "measResults": [ + { + "p": 1, + "sValue": "454" + }, + { + "p": 2, + "sValue": "569" + }, + { + "p": 3, + "sValue": "672" + }, + { + "p": 4, + "sValue": "785" + } + ] + } + ] + } + ] + } + } + } +} \ No newline at end of file diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index 32e6c32..8a89c1a 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -3,12 +3,25 @@ { "id": "DmaapInformationType", "dmaapTopicUrl": "/dmaap-topic-1", - "useHttpProxy": false + "useHttpProxy": false, + "dataType": "text" }, { "id": "KafkaInformationType", "kafkaInputTopic": "TutorialTopic", - "useHttpProxy": false + "useHttpProxy": false + }, + { + "id": "PmInformationType", + "dmaapTopicUrl": "/dmaap-topic-2", + "useHttpProxy": false, + "dataType": "PmData" + }, + { + "id": "PmInformationTypeKafka", + "kafkaInputTopic": "TutorialTopic", + "useHttpProxy": false, + "dataType": "PmData" } ] -} \ No newline at end of file +}