PM Filter 66/8066/4
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 8 Apr 2022 11:13:57 +0000 (13:13 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 14 Apr 2022 07:19:32 +0000 (09:19 +0200)
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: I068887f91865df92d014052d9ecfdf38831938a7

24 files changed:
api/api.json
api/api.yaml
config/README
pom.xml
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/main/resources/typeSchemaPmDataDmaap.json [new file with mode: 0644]
src/main/resources/typeSchemaPmDataKafka.json [new file with mode: 0644]
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java [new file with mode: 0644]
src/test/resources/pm_report.json [new file with mode: 0644]
src/test/resources/test_application_configuration.json

index 88fed46..a58aced 100644 (file)
             }},
             "tags": ["Actuator"]
         }},
+        "/dmaap-topic-2": {"get": {
+            "summary": "GET from topic",
+            "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.",
+            "operationId": "getFromPmTopic",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+            }},
+            "tags": ["DMAAP Simulator (exists only in test)"]
+        }},
         "/consumer": {"post": {
             "summary": "Consume data",
             "requestBody": {
index f6eb1f7..02697ee 100644 (file)
@@ -334,6 +334,21 @@ paths:
             '*/*':
               schema:
                 type: object
+  /dmaap-topic-2:
+    get:
+      tags:
+      - DMAAP Simulator (exists only in test)
+      summary: GET from topic
+      description: The call is invoked to activate or to modify a data subscription.
+        The endpoint is provided by the Information Producer.
+      operationId: getFromPmTopic
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/void'
   /consumer:
     post:
       tags:
index 140927f..a2137b5 100644 (file)
@@ -26,7 +26,7 @@ keytool -list -v -keystore truststore.jks -storepass policy_agent
 
 ## License
 
-Copyright (C) 2020 Nordix Foundation. All rights reserved.
+Copyright (C) 2022 Nordix Foundation. All rights reserved.
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
diff --git a/pom.xml b/pom.xml
index 18fde28..53515c3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
             <optional>true</optional>
         </dependency>
         <!-- TEST -->
+        <!-- https://mvnrepository.com/artifact/com.github.erosb/everit-json-schema -->
+        <dependency>
+            <groupId>com.github.erosb</groupId>
+            <artifactId>everit-json-schema</artifactId>
+            <version>1.12.1</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
+</project>
\ No newline at end of file
index 3ea64e7..7b97486 100644 (file)
@@ -124,7 +124,6 @@ public class ApplicationConfig {
 
     public Collection<InfoType> getTypes() {
         com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
-
         try {
             String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
             ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
index 27b527d..3a78d6c 100644 (file)
@@ -38,11 +38,14 @@ public class InfoType {
     @Getter
     private final String kafkaInputTopic;
 
-    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) {
+    private final String dataType;
+
+    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType) {
         this.id = id;
         this.dmaapTopicUrl = dmaapTopicUrl;
         this.useHttpProxy = useHttpProxy;
         this.kafkaInputTopic = kafkaInputTopic;
+        this.dataType = dataType;
     }
 
     public boolean isKafkaTopicDefined() {
@@ -53,4 +56,14 @@ public class InfoType {
         return StringUtils.hasLength(dmaapTopicUrl);
     }
 
+    public enum DataType {
+        PM_DATA, TEXT
+    }
+
+    public DataType getDataType() {
+        if (dataType != null && dataType.equalsIgnoreCase("pmData")) {
+            return DataType.PM_DATA;
+        }
+        return DataType.TEXT;
+    }
 }
index 5f7521c..5a2e373 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import com.google.common.base.Strings;
+
 import java.time.Duration;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import lombok.Getter;
 
 import org.immutables.gson.Gson;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.repository.filters.Filter;
+import org.oran.dmaapadapter.repository.filters.PmReportFilter;
+import org.oran.dmaapadapter.repository.filters.RegexpFilter;
 
 public class Job {
 
@@ -38,18 +41,23 @@ public class Job {
         @Getter
         private BufferTimeout bufferTimeout;
 
-        private int maxConcurrency;
+        private Integer maxConcurrency;
+
+        @Getter
+        private PmReportFilter.FilterData pmFilter;
 
         public Parameters() {}
 
-        public Parameters(String filter, BufferTimeout bufferTimeout, int maxConcurrency) {
+        public Parameters(String filter, BufferTimeout bufferTimeout, Integer maxConcurrency,
+                PmReportFilter.FilterData pmFilter) {
             this.filter = filter;
             this.bufferTimeout = bufferTimeout;
             this.maxConcurrency = maxConcurrency;
+            this.pmFilter = pmFilter;
         }
 
         public int getMaxConcurrency() {
-            return maxConcurrency == 0 ? 1 : maxConcurrency;
+            return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
         }
     }
 
@@ -90,7 +98,7 @@ public class Job {
     @Getter
     private final String lastUpdated;
 
-    private final Pattern jobDataFilter;
+    private final Filter filter;
 
     @Getter
     private final AsyncRestClient consumerRestClient;
@@ -103,20 +111,22 @@ public class Job {
         this.owner = owner;
         this.lastUpdated = lastUpdated;
         this.parameters = parameters;
-        if (parameters != null && parameters.filter != null) {
-            jobDataFilter = Pattern.compile(parameters.filter);
+        if (parameters != null && !Strings.isNullOrEmpty(parameters.filter)) {
+            filter = new RegexpFilter(parameters.filter);
+        } else if (parameters != null && parameters.pmFilter != null) {
+            filter = new PmReportFilter(parameters.pmFilter);
         } else {
-            jobDataFilter = null;
+            filter = null;
         }
         this.consumerRestClient = consumerRestClient;
+
     }
 
-    public boolean isFilterMatch(String data) {
-        if (jobDataFilter == null) {
-            return true;
+    public String filter(String data) {
+        if (filter == null) {
+            return data;
         }
-        Matcher matcher = jobDataFilter.matcher(data);
-        return matcher.find();
+        return filter.filter(data);
     }
 
     public boolean isBuffered() {
diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java
new file mode 100644 (file)
index 0000000..e878387
--- /dev/null
@@ -0,0 +1,26 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+public interface Filter {
+    public String filter(String data);
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java
new file mode 100644 (file)
index 0000000..6f3300d
--- /dev/null
@@ -0,0 +1,121 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.immutables.gson.Gson;
+
+@Gson.TypeAdapters
+class PmReport {
+
+    Event event = new Event();
+
+    @Gson.TypeAdapters
+    public static class CommonEventHeader {
+        String domain;
+        String eventId;
+        int sequence;
+        String eventName;
+        String sourceName;
+        String reportingEntityName;
+        String priority;
+        long startEpochMicrosec;
+        long lastEpochMicrosec;
+        String version;
+        String vesEventListenerVersion;
+        String timeZoneOffset;
+    }
+
+    @Gson.TypeAdapters
+    public static class MeasInfoId {
+        String sMeasInfoId;
+    }
+
+    @Gson.TypeAdapters
+    public static class MeasTypes {
+        public String getMeasType(int pValue) {
+            if (pValue > sMeasTypesList.size()) {
+                return "MeasTypeIndexOutOfBounds:" + pValue;
+            }
+            return sMeasTypesList.get(pValue - 1);
+        }
+
+        protected ArrayList<String> sMeasTypesList = new ArrayList<>();
+    }
+
+    @Gson.TypeAdapters
+    public static class MeasResult {
+        int p;
+        String sValue;
+    }
+
+    @Gson.TypeAdapters
+    public static class MeasValuesList {
+        String measObjInstId;
+        String suspectFlag;
+        Collection<MeasResult> measResults = new ArrayList<>();
+
+        public MeasValuesList shallowClone() {
+            MeasValuesList n = new MeasValuesList();
+            n.measObjInstId = this.measObjInstId;
+            n.suspectFlag = this.suspectFlag;
+            return n;
+        }
+    }
+
+    @Gson.TypeAdapters
+    public static class MeasInfoList {
+        MeasInfoId measInfoId;
+        MeasTypes measTypes;
+        Collection<MeasValuesList> measValuesList = new ArrayList<>();
+
+        public MeasInfoList shallowClone() {
+            MeasInfoList n = new MeasInfoList();
+            n.measInfoId = this.measInfoId;
+            n.measTypes = new MeasTypes();
+            return n;
+        }
+    }
+
+    @Gson.TypeAdapters
+    public static class MeasDataCollection {
+        int granularityPeriod;
+        String measuredEntityUserName;
+        String measuredEntityDn;
+        String measuredEntitySoftwareVersion;
+        Collection<MeasInfoList> measInfoList = new ArrayList<>();
+    }
+
+    @Gson.TypeAdapters
+    public static class Perf3gppFields {
+        String perf3gppFieldsVersion;
+        MeasDataCollection measDataCollection;
+    }
+
+    @Gson.TypeAdapters
+    public static class Event {
+        CommonEventHeader commonEventHeader;
+        Perf3gppFields perf3gppFields;
+    }
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java
new file mode 100644 (file)
index 0000000..9565d90
--- /dev/null
@@ -0,0 +1,177 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Getter;
+
+import org.immutables.gson.Gson;
+import org.thymeleaf.util.StringUtils;
+
+public class PmReportFilter implements Filter {
+
+    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private final FilterData filterData;
+
+    @Gson.TypeAdapters
+    @Getter
+    public static class FilterData {
+        Collection<String> sourceNames = new ArrayList<>();
+        Collection<String> measObjInstIds = new ArrayList<>();
+        Collection<String> measTypes = new ArrayList<>();
+        Collection<String> measuredEntityDns = new ArrayList<>();
+    }
+
+    private static class MeasTypesIndexed extends PmReport.MeasTypes {
+        private Map<String, Integer> map = new HashMap<>();
+
+        public int addP(String measTypeName) {
+            Integer p = map.get(measTypeName);
+            if (p != null) {
+                return p;
+            } else {
+                this.sMeasTypesList.add(measTypeName);
+                this.map.put(measTypeName, this.sMeasTypesList.size());
+                return this.sMeasTypesList.size();
+            }
+        }
+    }
+
+    public PmReportFilter(FilterData filterData) {
+        this.filterData = filterData;
+    }
+
+    @Override
+    public String filter(String data) {
+        PmReport report = gson.fromJson(data, PmReport.class);
+        if (!filter(report, this.filterData)) {
+            return "";
+        }
+        return gson.toJson(report);
+
+    }
+
+    /**
+     * Updates the report based on the filter data.
+     *
+     * @param report
+     * @param filterData
+     * @return true if there is anything left in the report
+     */
+    private boolean filter(PmReport report, FilterData filterData) {
+        if (!matchSourceNames(report, filterData.sourceNames)) {
+            return false;
+        }
+        Collection<PmReport.MeasInfoList> filtered = createMeasObjInstIds(report, filterData);
+        report.event.perf3gppFields.measDataCollection.measInfoList = filtered;
+        return !filtered.isEmpty();
+    }
+
+    private boolean isContainedInAny(String aString, Collection<String> collection) {
+        for (String s : collection) {
+            if (StringUtils.contains(aString, s) == Boolean.TRUE) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isMeasResultMatch(PmReport.MeasResult measResult, PmReport.MeasTypes measTypes, FilterData filter) {
+        String measType = measTypes.getMeasType(measResult.p);
+        return filter.measTypes.isEmpty() || filter.measTypes.contains(measType);
+    }
+
+    private Collection<PmReport.MeasResult> createMeasResults(Collection<PmReport.MeasResult> oldMeasResults,
+            PmReport.MeasTypes measTypes, FilterData filter) {
+        Collection<PmReport.MeasResult> newMeasResults = new ArrayList<>();
+
+        for (PmReport.MeasResult measResult : oldMeasResults) {
+            if (isMeasResultMatch(measResult, measTypes, filter)) {
+                newMeasResults.add(measResult);
+            }
+        }
+        return newMeasResults;
+    }
+
+    private PmReport.MeasValuesList createMeasValuesList(PmReport.MeasValuesList oldMeasValues,
+            PmReport.MeasTypes measTypes, FilterData filter) {
+
+        PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone();
+
+        if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) {
+            newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter);
+        }
+        return newMeasValuesList;
+    }
+
+    private PmReport.MeasTypes createMeasTypes(Collection<PmReport.MeasValuesList> measValues,
+            PmReport.MeasTypes oldMMeasTypes) {
+        MeasTypesIndexed newMeasTypes = new MeasTypesIndexed();
+        for (PmReport.MeasValuesList l : measValues) {
+            for (PmReport.MeasResult r : l.measResults) {
+                String measTypeName = oldMMeasTypes.getMeasType(r.p);
+                r.p = newMeasTypes.addP(measTypeName);
+            }
+        }
+        return newMeasTypes;
+    }
+
+    private PmReport.MeasInfoList createMeasInfoList(PmReport.MeasInfoList oldMeasInfoList, FilterData filter) {
+        PmReport.MeasInfoList newMeasInfoList = oldMeasInfoList.shallowClone();
+
+        for (PmReport.MeasValuesList oldValues : oldMeasInfoList.measValuesList) {
+            PmReport.MeasValuesList newMeasValues = createMeasValuesList(oldValues, oldMeasInfoList.measTypes, filter);
+            if (!newMeasValues.measResults.isEmpty()) {
+                newMeasInfoList.measValuesList.add(newMeasValues);
+            }
+        }
+        newMeasInfoList.measTypes = createMeasTypes(newMeasInfoList.measValuesList, oldMeasInfoList.measTypes);
+        return newMeasInfoList;
+    }
+
+    private boolean matchMeasuredEntityDns(PmReport report, FilterData filter) {
+        return filter.measuredEntityDns.isEmpty() || this.isContainedInAny(
+                report.event.perf3gppFields.measDataCollection.measuredEntityDn, filter.measuredEntityDns);
+    }
+
+    private Collection<PmReport.MeasInfoList> createMeasObjInstIds(PmReport report, FilterData filter) {
+        Collection<PmReport.MeasInfoList> newList = new ArrayList<>();
+        if (!matchMeasuredEntityDns(report, filter)) {
+            return newList;
+        }
+        for (PmReport.MeasInfoList oldMeasInfoList : report.event.perf3gppFields.measDataCollection.measInfoList) {
+            PmReport.MeasInfoList l = createMeasInfoList(oldMeasInfoList, filter);
+            if (!l.measValuesList.isEmpty()) {
+                newList.add(l);
+            }
+        }
+        return newList;
+    }
+
+    private boolean matchSourceNames(PmReport report, Collection<String> sourceNames) {
+        return sourceNames.isEmpty() || sourceNames.contains(report.event.commonEventHeader.sourceName);
+    }
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java
new file mode 100644 (file)
index 0000000..aac808c
--- /dev/null
@@ -0,0 +1,54 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class RegexpFilter implements Filter {
+
+    private final Pattern regexp;
+
+    public RegexpFilter(String exp) {
+        if (exp != null) {
+            regexp = Pattern.compile(exp);
+        } else {
+            regexp = null;
+        }
+    }
+
+    public String filter(String data) {
+
+        if (regexp == null) {
+            return data;
+        }
+        Matcher matcher = regexp.matcher(data);
+        boolean match = matcher.find();
+        if (match) {
+            return data;
+        } else {
+            return "";
+        }
+    }
+
+}
index fe7ec8b..dfa6d07 100644 (file)
@@ -33,6 +33,7 @@ import org.springframework.http.MediaType;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuples;
 
 /**
  * The class fetches incoming requests from DMAAP and sends them further to the
@@ -99,9 +100,11 @@ public class DmaapTopicConsumer {
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
-                .filter(job -> job.isFilterMatch(body)) //
-                .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
-                .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
+                .map(job -> Tuples.of(job, job.filter(body))) //
+                .filter(t -> !t.getT2().isEmpty()) //
+                .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) //
+                .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(),
+                        MediaType.APPLICATION_JSON), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
 }
index 2a16f47..e4675a5 100644 (file)
@@ -108,7 +108,8 @@ public class KafkaJobDataConsumer {
     }
 
     private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
-        Flux<String> result = input.filter(job::isFilterMatch);
+        Flux<String> result = input.map(job::filter) //
+                .filter(t -> !t.isEmpty()); //
 
         if (job.isBuffered()) {
             result = result.map(this::quote) //
index ec3f2b2..6623bb8 100644 (file)
@@ -149,7 +149,12 @@ public class ProducerRegstrationTask {
     }
 
     private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
-        String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+        String schemaFile;
+        if (type.getDataType() == InfoType.DataType.PM_DATA) {
+            schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaPmDataKafka.json" : "/typeSchemaPmDataDmaap.json";
+        } else {
+            schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+        }
         return jsonObject(readSchemaFile(schemaFile));
     }
 
diff --git a/src/main/resources/typeSchemaPmDataDmaap.json b/src/main/resources/typeSchemaPmDataDmaap.json
new file mode 100644 (file)
index 0000000..8dd73e9
--- /dev/null
@@ -0,0 +1,45 @@
+{
+   "$schema": "http://json-schema.org/draft-04/schema#",
+   "type": "object",
+   "additionalProperties": false,
+   "properties": {
+      "pmFilter": {
+         "type": "object",
+         "additionalProperties": false,
+         "properties": {
+            "sourceNames": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measObjInstIds": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measTypes": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measuredEntityDns": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            }
+         }
+      }
+   }
+}
\ No newline at end of file
diff --git a/src/main/resources/typeSchemaPmDataKafka.json b/src/main/resources/typeSchemaPmDataKafka.json
new file mode 100644 (file)
index 0000000..6690740
--- /dev/null
@@ -0,0 +1,68 @@
+{
+   "$schema": "http://json-schema.org/draft-04/schema#",
+   "type": "object",
+   "additionalProperties": false,
+   "properties": {
+      "pmFilter": {
+         "type": "object",
+         "additionalProperties": false,
+         "properties": {
+            "sourceNames": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measObjInstIds": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measTypes": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measuredEntityDns": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            }
+         }
+      },
+      "maxConcurrency": {
+         "type": "integer",
+         "minimum": 1
+      },
+      "bufferTimeout": {
+         "type": "object",
+         "additionalProperties": false,
+         "properties": {
+            "maxSize": {
+               "type": "integer",
+               "minimum": 1
+            },
+            "maxTimeMiliseconds": {
+               "type": "integer",
+               "minimum": 0,
+               "maximum": 160000
+            }
+         },
+         "required": [
+            "maxSize",
+            "maxTimeMiliseconds"
+         ]
+      }
+   }
+}
\ No newline at end of file
index 24603ec..2066645 100644 (file)
@@ -29,7 +29,9 @@ import com.google.gson.JsonParser;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import org.json.JSONObject;
@@ -50,6 +52,7 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.filters.PmReportFilter;
 import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
 import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
@@ -194,6 +197,10 @@ class ApplicationTest {
         return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
     }
 
+    private ConsumerJobInfo consumerPmJobInfo() {
+        return consumerJobInfo("PmInformationType", "EI_PM_JOB_ID");
+    }
+
     private Object jsonObject() {
         return jsonObject("{}");
     }
@@ -248,14 +255,14 @@ class ApplicationTest {
     }
 
     @Test
-    void testReceiveAndPostDataFromKafka() {
+    void testReceiveAndPostDataFromKafka() throws Exception {
         final String JOB_ID = "ID";
         final String TYPE_ID = "KafkaInformationType";
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create a job
-        Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+        Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1, null);
         String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
         ConsumerJobInfo kafkaJobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
@@ -317,6 +324,72 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
+    @Test
+    void testPmFiltering() throws Exception {
+        // Create a job
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        producerRegistrationTask.supervisionTask().block();
+
+        // Create a job with a PM filter
+        ConsumerJobInfo jobInfo = consumerPmJobInfo();
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
+        filterData.getSourceNames().add("O-DU-1122");
+        filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
+        Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+        String paramJson = gson.toJson(param);
+        jobInfo.jobDefinition = jsonObject(paramJson);
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Return one messagefrom DMAAP and verify that the job (consumer) receives a
+        // filtered PM message
+        String path = "./src/test/resources/pm_report.json";
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+        DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+        String receivedFiltered = consumer.receivedBodies.get(0);
+        assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
+
+        // Delete the job
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
+    @Test
+    void testPmFilteringKafka() throws Exception {
+        // Test that the schema for kafka and pm filtering is OK.
+
+        // Create a job
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        producerRegistrationTask.supervisionTask().block();
+
+        // Create a job with a PM filter
+        ConsumerJobInfo jobInfo = consumerPmJobInfo();
+        jobInfo.infoTypeId = "PmInformationTypeKafka";
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        Job.Parameters param = new Job.Parameters(null, null, null, filterData);
+        String paramJson = gson.toJson(param);
+        jobInfo.jobDefinition = jsonObject(paramJson);
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Delete the job
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
     @Test
     void testReRegister() throws Exception {
         // Wait foir register types and producer
@@ -330,8 +403,8 @@ class ApplicationTest {
 
         // Just clear the registerred types, should trigger a re-register
         icsSimulatorController.testResults.types.clear();
-        await().untilAsserted(
-                () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds)
+                .hasSize(this.types.size()));
     }
 
     public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
index 5259ee1..db0bd22 100644 (file)
@@ -49,9 +49,12 @@ public class DmaapSimulatorController {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1";
+    public static final String DMAAP_TOPIC_PM_URL = "/dmaap-topic-2";
 
     public static List<String> dmaapResponses = Collections.synchronizedList(new LinkedList<String>());
 
+    public static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
+
     @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE)
     @Operation(summary = "GET from topic",
             description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.")
@@ -70,4 +73,21 @@ public class DmaapSimulatorController {
 
     }
 
+    @GetMapping(path = DMAAP_TOPIC_PM_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+    @Operation(summary = "GET from topic",
+            description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.")
+    @ApiResponses(value = { //
+            @ApiResponse(responseCode = "200", description = "OK", //
+                    content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
+    })
+    public ResponseEntity<Object> getFromPmTopic() {
+        if (dmaapPmResponses.isEmpty()) {
+            return ErrorResponse.create("", HttpStatus.NOT_FOUND);
+        } else {
+            String resp = dmaapPmResponses.remove(0);
+            return new ResponseEntity<>(resp, HttpStatus.OK);
+        }
+
+    }
+
 }
index 790aafb..286712e 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oran.dmaapadapter;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
@@ -29,7 +30,9 @@ import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.json.JSONObject;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerJobInfo;
@@ -99,15 +102,35 @@ public class IcsSimulatorController {
         return new ResponseEntity<>(HttpStatus.OK);
     }
 
-    public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) {
+    public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) throws ServiceException {
         String url = this.testResults.registrationInfo.jobCallbackUrl;
         ProducerJobInfo request =
                 new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
         String body = gson.toJson(request);
+        validateJsonObjectAgainstSchema(job.jobDefinition, testResults.types.get(job.infoTypeId).jobDataSchema);
         logger.info("ICS Simulator PUT job: {}", body);
         restClient.post(url, body, MediaType.APPLICATION_JSON).block();
     }
 
+    private void validateJsonObjectAgainstSchema(Object object, Object schemaObj) throws ServiceException {
+        if (schemaObj != null) { // schema is optional for now
+            try {
+                ObjectMapper mapper = new ObjectMapper();
+
+                String schemaAsString = mapper.writeValueAsString(schemaObj);
+                JSONObject schemaJSON = new JSONObject(schemaAsString);
+                var schema = org.everit.json.schema.loader.SchemaLoader.load(schemaJSON);
+
+                String objectAsString = object.toString();
+                JSONObject json = new JSONObject(objectAsString);
+                schema.validate(json);
+            } catch (Exception e) {
+                logger.error("Json validation failure {}", e.toString());
+                throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.BAD_REQUEST);
+            }
+        }
+    }
+
     public void deleteJob(String jobId, AsyncRestClient restClient) {
         String url = this.testResults.registrationInfo.jobCallbackUrl + "/" + jobId;
         logger.info("ICS Simulator DELETE job: {}", url);
index 559b144..62d4158 100644 (file)
@@ -214,7 +214,7 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1, null);
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
@@ -233,7 +233,7 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1);
+        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1, null);
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
index f637917..5f19633 100644 (file)
@@ -186,7 +186,7 @@ class IntegrationWithKafka {
     private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
             int maxConcurrency) {
         Job.Parameters param =
-                new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
+                new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency, null);
         String str = gson.toJson(param);
         return jsonObject(str);
     }
@@ -254,7 +254,7 @@ class IntegrationWithKafka {
     }
 
     @Test
-    void simpleCase() throws InterruptedException {
+    void simpleCase() throws Exception {
         final String JOB_ID = "ID";
 
         // Register producer, Register types
@@ -307,7 +307,7 @@ class IntegrationWithKafka {
     }
 
     @Test
-    void kafkaIOverflow() throws InterruptedException {
+    void kafkaIOverflow() throws Exception {
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
diff --git a/src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java
new file mode 100644 (file)
index 0000000..53da055
--- /dev/null
@@ -0,0 +1,118 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.repository.filters;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+class PmReportFilterTest {
+
+    @Test
+    void testPmFilterMeasTypes() throws Exception {
+
+        String reportJson = loadReport();
+
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.measTypes.add("succImmediateAssignProcs");
+
+        PmReportFilter filter = new PmReportFilter(filterData);
+        String filtered = filter.filter(reportJson);
+
+        assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1")
+                .contains("Gbg-997");
+
+        // Test that no report is returned if not meas types were found
+        filterData = new PmReportFilter.FilterData();
+        filterData.measTypes.add("junk");
+        filter = new PmReportFilter(filterData);
+        filtered = filter.filter(reportJson);
+        assertThat(filtered).isEmpty();
+    }
+
+    @Test
+    void testMeasObjInstIds() throws Exception {
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.measObjInstIds.add("junk");
+        PmReportFilter filter = new PmReportFilter(filterData);
+        String filtered = filter.filter(loadReport());
+        assertThat(filtered).isEmpty();
+
+        filterData = new PmReportFilter.FilterData();
+        filterData.measObjInstIds.add("UtranCell=Gbg-997");
+        filter = new PmReportFilter(filterData);
+        filtered = filter.filter(loadReport());
+        assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998");
+    }
+
+    @Test
+    void testSourceNames() throws Exception {
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.sourceNames.add("junk");
+        PmReportFilter filter = new PmReportFilter(filterData);
+        String filtered = filter.filter(loadReport());
+        assertThat(filtered).isEmpty();
+
+        filterData = new PmReportFilter.FilterData();
+        filterData.sourceNames.add("O-DU-1122");
+        filter = new PmReportFilter(filterData);
+        filtered = filter.filter(loadReport());
+        assertThat(filtered).contains("O-DU-1122");
+    }
+
+    @Test
+    void testMeasuredEntityDns() throws Exception {
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.measuredEntityDns.add("junk");
+        PmReportFilter filter = new PmReportFilter(filterData);
+        String filtered = filter.filter(loadReport());
+        assertThat(filtered).isEmpty();
+
+        filterData = new PmReportFilter.FilterData();
+        filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1");
+        filter = new PmReportFilter(filterData);
+        filtered = filter.filter(loadReport());
+        assertThat(filtered).contains("RNC-Gbg-1"); // '=' is escaped to unicode by gson. OK
+    }
+
+    @Test
+    void testParse() throws Exception {
+        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+        PmReport report = gson.fromJson(loadReport(), PmReport.class);
+
+        String dn = report.event.perf3gppFields.measDataCollection.measuredEntityDn;
+        String json = gson.toJson(report);
+        report = gson.fromJson(json, PmReport.class);
+
+        // '=' is escaped to unicode by gson. but converted back
+        assertThat(report.event.perf3gppFields.measDataCollection.measuredEntityDn).isEqualTo(dn);
+    }
+
+    private String loadReport() throws Exception {
+        String path = "./src/test/resources/pm_report.json";
+        return Files.readString(Path.of(path), Charset.defaultCharset());
+    }
+
+}
diff --git a/src/test/resources/pm_report.json b/src/test/resources/pm_report.json
new file mode 100644 (file)
index 0000000..1aa97c1
--- /dev/null
@@ -0,0 +1,228 @@
+{
+   "event": {
+      "commonEventHeader": {
+         "domain": "perf3gpp",
+         "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882",
+         "sequence": 0,
+         "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult",
+         "sourceName": "O-DU-1122",
+         "reportingEntityName": "",
+         "priority": "Normal",
+         "startEpochMicrosec": 951912000000,
+         "lastEpochMicrosec": 951912900000,
+         "version": "4.0",
+         "vesEventListenerVersion": "7.1",
+         "timeZoneOffset": "+00:00"
+      },
+      "perf3gppFields": {
+         "perf3gppFieldsVersion": "1.0",
+         "measDataCollection": {
+            "granularityPeriod": 900,
+            "measuredEntityUserName": "RNC Telecomville",
+            "measuredEntityDn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
+            "measuredEntitySoftwareVersion": "",
+            "measInfoList": [
+               {
+                  "measInfoId": {
+                     "sMeasInfoId": ""
+                  },
+                  "measTypes": {
+                     "sMeasTypesList": [
+                        "attTCHSeizures",
+                        "succTCHSeizures",
+                        "attImmediateAssignProcs",
+                        "succImmediateAssignProcs"
+                     ]
+                  },
+                  "measValuesList": [
+                     {
+                        "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-997",
+                        "suspectFlag": "false",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "813"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "913"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "1013"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "1113"
+                           }
+                        ]
+                     },
+                     {
+                        "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-998",
+                        "suspectFlag": "false",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "890"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "901"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "123"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "234"
+                           }
+                        ]
+                     },
+                     {
+                        "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-999",
+                        "suspectFlag": "true",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "456"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "567"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "678"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "789"
+                           }
+                        ]
+                     }
+                  ]
+               },
+               {
+                  "measInfoId": {
+                     "sMeasInfoId": "ENodeBFunction"
+                  },
+                  "measTypes": {
+                     "sMeasTypesList": [
+                        "attTCHSeizures1",
+                        "succTCHSeizures2",
+                        "attImmediateAssignProcs3",
+                        "succImmediateAssignProcs4"
+                     ]
+                  },
+                  "measValuesList": [
+                     {
+                        "measObjInstId": "ManagedElement=RNC-Gbg-1,ENodeBFunction=1",
+                        "suspectFlag": "false",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "4"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "86,87,2,6,77,96,75,33,24"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "40"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "90"
+                           }
+                        ]
+                     }
+                  ]
+               },
+               {
+                  "measInfoId": {
+                     "sMeasInfoId": ""
+                  },
+                  "measTypes": {
+                     "sMeasTypesList": [
+                        "attTCHSeizures5",
+                        "succTCHSeizures6",
+                        "attImmediateAssignProcs7",
+                        "succImmediateAssignProcs8"
+                     ]
+                  },
+                  "measValuesList": [
+                     {
+                        "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-997",
+                        "suspectFlag": "false",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "238"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "344"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "563"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "787"
+                           }
+                        ]
+                     },
+                     {
+                        "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-998",
+                        "suspectFlag": "false",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "898"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "905"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "127"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "238"
+                           }
+                        ]
+                     },
+                     {
+                        "measObjInstId": "RncFunction=RF-1,UtranCell=Gbg-999",
+                        "suspectFlag": "true",
+                        "measResults": [
+                           {
+                              "p": 1,
+                              "sValue": "454"
+                           },
+                           {
+                              "p": 2,
+                              "sValue": "569"
+                           },
+                           {
+                              "p": 3,
+                              "sValue": "672"
+                           },
+                           {
+                              "p": 4,
+                              "sValue": "785"
+                           }
+                        ]
+                     }
+                  ]
+               }
+            ]
+         }
+      }
+   }
+}
\ No newline at end of file
index 32e6c32..8a89c1a 100644 (file)
@@ -3,12 +3,25 @@
       {
          "id": "DmaapInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
-         "useHttpProxy": false
+         "useHttpProxy": false,
+         "dataType": "text"
       },
       {
          "id": "KafkaInformationType",
          "kafkaInputTopic": "TutorialTopic",
-         "useHttpProxy": false
+         "useHttpProxy": false       
+      },
+      {
+         "id": "PmInformationType",
+         "dmaapTopicUrl": "/dmaap-topic-2",
+         "useHttpProxy": false,
+         "dataType": "PmData"
+      },
+      {
+         "id": "PmInformationTypeKafka",
+         "kafkaInputTopic": "TutorialTopic",
+         "useHttpProxy": false,
+         "dataType": "PmData"
       }
    ]
-}
\ No newline at end of file
+}