+# ============LICENSE_START===============================================
+# Copyright (C) 2021-2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
---
version: 2
--- /dev/null
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
"description": "Void/empty",
"type": "object"
},
+ "job_statistics": {
+ "description": "Statistics information for one job",
+ "type": "object",
+ "required": [
+ "jobId",
+ "noOfReceivedBytes",
+ "noOfReceivedObjects",
+ "noOfSentBytes",
+ "noOfSentObjects",
+ "typeId"
+ ],
+ "properties": {
+ "noOfSentObjects": {
+ "format": "int32",
+ "type": "integer"
+ },
+ "jobId": {"type": "string"},
+ "outputTopic": {"type": "string"},
+ "noOfSentBytes": {
+ "format": "int32",
+ "type": "integer"
+ },
+ "clientId": {"type": "string"},
+ "groupId": {"type": "string"},
+ "noOfReceivedBytes": {
+ "format": "int32",
+ "type": "integer"
+ },
+ "typeId": {"type": "string"},
+ "inputTopic": {"type": "string"},
+ "noOfReceivedObjects": {
+ "format": "int32",
+ "type": "integer"
+ }
+ }
+ },
+ "statistics_info": {
+ "description": "Statistics information",
+ "type": "object",
+ "properties": {"jobStatistics": {
+ "description": "Statistics per job",
+ "type": "array",
+ "items": {"$ref": "#/components/schemas/job_statistics"}
+ }}
+ },
"producer_registration_info": {
"description": "Information for an Information Producer",
"type": "object",
}],
"tags": ["Information Coordinator Service Simulator (exists only in test)"]
}},
+ "/statistics": {"get": {
+ "summary": "Returns statistics",
+ "operationId": "getStatistics",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/statistics_info"}}}
+ }},
+ "tags": ["Producer job control API"]
+ }},
"/generic_dataproducer/health_check": {"get": {
"summary": "Producer supervision",
"description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
application/json:
schema:
type: object
+ /statistics:
+ get:
+ tags:
+ - Producer job control API
+ summary: Returns statistics
+ operationId: getStatistics
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/statistics_info'
/generic_dataproducer/health_check:
get:
tags:
void:
type: object
description: Void/empty
+ job_statistics:
+ required:
+ - jobId
+ - noOfReceivedBytes
+ - noOfReceivedObjects
+ - noOfSentBytes
+ - noOfSentObjects
+ - typeId
+ type: object
+ properties:
+ noOfSentObjects:
+ type: integer
+ format: int32
+ jobId:
+ type: string
+ outputTopic:
+ type: string
+ noOfSentBytes:
+ type: integer
+ format: int32
+ clientId:
+ type: string
+ groupId:
+ type: string
+ noOfReceivedBytes:
+ type: integer
+ format: int32
+ typeId:
+ type: string
+ inputTopic:
+ type: string
+ noOfReceivedObjects:
+ type: integer
+ format: int32
+ description: Statistics information for one job
+ statistics_info:
+ type: object
+ properties:
+ jobStatistics:
+ type: array
+ description: Statistics per job
+ items:
+ $ref: '#/components/schemas/job_statistics'
+ description: Statistics information
producer_registration_info:
required:
- info_job_callback_url
1) Create a CA certificate and a private key:
openssl genrsa -des3 -out CA-key.pem 2048
-openssl req -new -key CA-key.pem -x509 -days 1000 -out CA-cert.pem
+openssl req -new -key CA-key.pem -x509 -days 3600 -out CA-cert.pem
2) Create a keystore with a private key entry that is signed by the CA:
+Note: your name must be "localhost" for the unittest.
+
keytool -genkeypair -alias policy_agent -keyalg RSA -keysize 2048 -keystore keystore.jks -validity 3650 -storepass policy_agent
keytool -certreq -alias policy_agent -file request.csr -keystore keystore.jks -ext san=dns:your.domain.com -storepass policy_agent
openssl x509 -req -days 365 -in request.csr -CA CA-cert.pem -CAkey CA-key.pem -CAcreateserial -out ca_signed-cert.pem
+# ============LICENSE_START===============================================
+# Copyright (C) 2021-2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
spring:
profiles:
active: prod
logging:
# Configuration of logging
level:
- ROOT: ERROR
+ ROOT: WARN
+ org.apache.kafka: WARN
org.springframework: ERROR
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.oran.dmaapadapter: INFO
+
file:
name: /var/log/dmaap-adapter-service/application.log
server:
kafka:
bootstrap-servers: localhost:9092
# If the file name is empty, no authorization token is used
- auth-token-file:
\ No newline at end of file
+ auth-token-file:
+ pm-files-path: /tmp
"kafkaInputTopic": "TutorialTopic",
"useHttpProxy": false
},
- {
+ {
"id": "PmData",
"dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
"useHttpProxy": true,
- "dataType" : "pmData"
- },
+ "dataType": "pmData"
+ }
]
-}
+}
\ No newline at end of file
+# ============LICENSE_START===============================================
+# Copyright (C) 2021-2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
from docs_conf.conf import *
#branch configuration
For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32".
* measTypes selects the meas types to get
* measuredEntityDns partial match of meas entity DNs.
+* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
+ Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
All PM filter properties are optional and a non given will result in "match all".
The result of the filtering is still following the structure of a 3GPP PM report.
.. code-block:: javascript
{
- "filterType":"pmdata"
+ "filterType":"pmdata",
"filter": {
"sourceNames":[
"O-DU-1122"
],
"measTypes":[
"succImmediateAssignProcs"
- ],eparate call.
+ ],
"measuredEntityDns":[
"ManagedElement=RNC-Gbg-1"
]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.8</version>
+ <version>2.5.14</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric.plt</groupId>
<artifactId>dmaapadapter</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>1.1.1-SNAPSHOT</version>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<java.version>11</java.version>
<springfox.version>3.0.0</springfox.version>
<gson.version>2.9.0</gson.version>
- <swagger.version>2.1.6</swagger.version>
+ <swagger.version>2.2.1</swagger.version>
<json.version>20211205</json.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<formatter-maven-plugin.version>2.12.2</formatter-maven-plugin.version>
<exec.skip>true</exec.skip>
</properties>
<dependencies>
- <dependency>
- <groupId>org.springdoc</groupId>
- <artifactId>springdoc-openapi-ui</artifactId>
- <version>1.6.3</version>
- </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>${gson.version}</version>
- </dependency>
- <dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- <version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
</dependency>
<!-- TEST -->
<!-- https://mvnrepository.com/artifact/com.github.erosb/everit-json-schema -->
+ <dependency>
+ <groupId>org.springdoc</groupId>
+ <artifactId>springdoc-openapi-ui</artifactId>
+ <version>1.6.3</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>mockwebserver</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
- <version>1.3.9</version>
+ <version>1.3.12</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@Setter
private Path authTokenFilePath;
- public SecurityContext(@Value("${app.auth-token-file:\"\"}") String authTokenFilename) {
+ public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
if (!authTokenFilename.isEmpty()) {
this.authTokenFilePath = Path.of(authTokenFilename);
}
}
try {
long lastModified = authTokenFilePath.toFile().lastModified();
- if (lastModified != this.tokenTimestamp) {
+ if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
this.authToken = Files.readString(authTokenFilePath);
this.tokenTimestamp = lastModified;
}
@Value("${app.webclient.trust-store}")
private String sslTrustStore = "";
- @Value("${app.webclient.http.proxy-host:\"\"}")
+ @Value("${app.webclient.http.proxy-host:}")
private String httpProxyHost = "";
@Value("${app.webclient.http.proxy-port:0}")
@Value("${app.kafka.bootstrap-servers:}")
private String kafkaBootStrapServers;
+ @Getter
+ @Value("${app.pm-files-path:}")
+ private String pmFilesPath;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
public class ErrorResponse {
private static Gson gson = new GsonBuilder() //
+ .disableHtmlEscaping() //
.create(); //
// Returned as body for all failed REST calls
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
public static final String API_DESCRIPTION = "";
public static final String JOB_URL = "/generic_dataproducer/info_job";
public static final String SUPERVISION_URL = "/generic_dataproducer/health_check";
- private static Gson gson = new GsonBuilder().create();
+
+ public static final String STATISTICS_URL = "/statistics";
+
+ private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
private final Jobs jobs;
private final InfoTypes types;
return new ResponseEntity<>(HttpStatus.OK);
}
+ @Schema(name = "statistics_info", description = "Statistics information")
+ public class Statistics {
+
+ @Schema(description = "Statistics per job")
+ public final Collection<Job.Statistics> jobStatistics;
+
+ public Statistics(Collection<Job.Statistics> stats) {
+ this.jobStatistics = stats;
+ }
+
+ }
+
+ @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "Returns statistics", description = "")
+ @ApiResponses(value = { //
+ @ApiResponse(responseCode = "200", description = "OK", //
+ content = @Content(schema = @Schema(implementation = Statistics.class))) //
+ })
+ public ResponseEntity<Object> getStatistics() {
+ List<Job.Statistics> res = new ArrayList<>();
+ for (Job job : this.jobs.getAll()) {
+ res.add(job.getStatistics());
+ }
+
+ return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK);
+ }
+
}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
+
+import lombok.ToString;
+
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
public interface Filter {
- public String filter(String data);
+ public enum Type {
+ REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
+ }
+
+ @ToString
+ public static class FilteredData {
+ public final String key;
+ public final String value;
+ private static final FilteredData emptyData = new FilteredData("", "");
+
+ public boolean isEmpty() {
+ return value.isEmpty() && key.isEmpty();
+ }
+
+ public FilteredData(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static FilteredData empty() {
+ return emptyData;
+ }
+ }
+
+ public FilteredData filter(DataFromTopic data);
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.filter;
+
+import com.google.gson.GsonBuilder;
+
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterFactory {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static com.google.gson.Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ private FilterFactory() {}
+
+ public static Filter create(Object filter, Filter.Type type) {
+ switch (type) {
+ case PM_DATA:
+ return new PmReportFilter(createPmFilterData(filter));
+ case REGEXP:
+ return new RegexpFilter(filter.toString());
+ case JSLT:
+ return new JsltFilter(filter.toString());
+ case JSON_PATH:
+ return new JsonPathFilter(filter.toString());
+ case NONE:
+ return null;
+ default:
+ logger.error("Not handeled filter type: {}", type);
+ return null;
+ }
+ }
+
+ private static PmReportFilter.FilterData createPmFilterData(Object filter) {
+ String str = gson.toJson(filter);
+ return gson.fromJson(str, PmReportFilter.FilterData.class);
+ }
+
+}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.schibsted.spt.data.jslt.Expression;
import com.schibsted.spt.data.jslt.Parser;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JsltFilter implements Filter {
+class JsltFilter implements Filter {
private Expression expression;
private final ObjectMapper mapper = new ObjectMapper();
}
@Override
- public String filter(String jsonString) {
+ public FilteredData filter(DataFromTopic data) {
if (expression == null) {
- return jsonString;
+ return new FilteredData(data.key, data.value);
}
try {
JsonFactory factory = mapper.getFactory();
- JsonParser parser = factory.createParser(jsonString);
+ JsonParser parser = factory.createParser(data.value);
JsonNode actualObj = mapper.readTree(parser);
JsonNode filteredNode = expression.apply(actualObj);
if (filteredNode == NullNode.instance) {
- return "";
+ return FilteredData.empty();
}
- return mapper.writeValueAsString(filteredNode);
+ return new FilteredData(data.key, mapper.writeValueAsString(filteredNode));
} catch (Exception e) {
- return "";
+ return FilteredData.empty();
}
}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
import com.jayway.jsonpath.JsonPath;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JsonPathFilter implements Filter {
+class JsonPathFilter implements Filter {
private String expression;
private static final Logger logger = LoggerFactory.getLogger(JsonPathFilter.class);
- com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
public JsonPathFilter(String exp) {
try {
}
@Override
- public String filter(String jsonString) {
+ public FilteredData filter(DataFromTopic data) {
try {
- Object o = JsonPath.parse(jsonString).read(this.expression, Object.class);
- return o == null ? "" : gson.toJson(o);
+ Object o = JsonPath.parse(data.value).read(this.expression, Object.class);
+ return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o));
} catch (Exception e) {
- return "";
+ return FilteredData.empty();
}
}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
+
+import com.google.gson.annotations.Expose;
import java.util.ArrayList;
import java.util.Collection;
public class PmReport {
+ @Expose
Event event = new Event();
public static class CommonEventHeader {
+ @Expose
String domain;
+
+ @Expose
String eventId;
+
+ @Expose
int sequence;
+
+ @Expose
String eventName;
+
+ @Expose
String sourceName;
+
+ @Expose
String reportingEntityName;
+
+ @Expose
String priority;
+
+ @Expose
long startEpochMicrosec;
+
+ @Expose
long lastEpochMicrosec;
+
+ @Expose
String version;
+
+ @Expose
String vesEventListenerVersion;
+
+ @Expose
String timeZoneOffset;
}
public static class MeasInfoId {
- String sMeasInfoId;
+ @Expose
+ String sMeasInfoId = "";
}
public static class MeasTypes {
return sMeasTypesList.get(pValue - 1);
}
+ @Expose
protected ArrayList<String> sMeasTypesList = new ArrayList<>();
}
public static class MeasResult {
+ @Expose
int p;
- String sValue;
+
+ @Expose
+ String sValue = "";
}
public static class MeasValuesList {
+ @Expose
String measObjInstId;
+
+ @Expose
String suspectFlag;
+
+ @Expose
Collection<MeasResult> measResults = new ArrayList<>();
public MeasValuesList shallowClone() {
}
public static class MeasInfoList {
+ @Expose
MeasInfoId measInfoId;
+
+ @Expose
MeasTypes measTypes;
+
+ @Expose
Collection<MeasValuesList> measValuesList = new ArrayList<>();
public MeasInfoList shallowClone() {
}
public static class MeasDataCollection {
+ @Expose
int granularityPeriod;
+
+ @Expose
String measuredEntityUserName;
+
+ @Expose
String measuredEntityDn;
+
+ @Expose
String measuredEntitySoftwareVersion;
+
+ @Expose
Collection<MeasInfoList> measInfoList = new ArrayList<>();
}
public static class Perf3gppFields {
+ @Expose
String perf3gppFieldsVersion;
+
+ @Expose
MeasDataCollection measDataCollection;
}
public static class Event {
+ @Expose
CommonEventHeader commonEventHeader;
+
+ @Expose
Perf3gppFields perf3gppFields;
}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
-
+package org.oran.dmaapadapter.filter;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import lombok.Getter;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.thymeleaf.util.StringUtils;
public class PmReportFilter implements Filter {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder() //
+ .disableHtmlEscaping() //
+ .excludeFieldsWithoutExposeAnnotation() //
+ .create();
+
+ // excludeFieldsWithoutExposeAnnotation is not needed when parsing and this is a
+ // bit quicker
+ private static com.google.gson.Gson gsonParse = new com.google.gson.GsonBuilder() //
+ .disableHtmlEscaping() //
+ .create();
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private final FilterData filterData;
@Getter
public static class FilterData {
- Collection<String> sourceNames = new ArrayList<>();
- Collection<String> measObjInstIds = new ArrayList<>();
- Collection<String> measTypes = new ArrayList<>();
- Collection<String> measuredEntityDns = new ArrayList<>();
+ final Collection<String> sourceNames = new HashSet<>();
+ final Collection<String> measObjInstIds = new ArrayList<>();
+ final Collection<String> measTypes = new HashSet<>();
+ final Collection<String> measuredEntityDns = new ArrayList<>();
+ final Collection<String> measObjClass = new HashSet<>();
}
private static class MeasTypesIndexed extends PmReport.MeasTypes {
+
private Map<String, Integer> map = new HashMap<>();
public int addP(String measTypeName) {
if (p != null) {
return p;
} else {
- this.sMeasTypesList.add(measTypeName);
- this.map.put(measTypeName, this.sMeasTypesList.size());
- return this.sMeasTypesList.size();
+ sMeasTypesList.add(measTypeName);
+ this.map.put(measTypeName, sMeasTypesList.size());
+ return sMeasTypesList.size();
}
}
}
}
@Override
- public String filter(String data) {
- PmReport report = gson.fromJson(data, PmReport.class);
- if (!filter(report, this.filterData)) {
- return "";
+ public FilteredData filter(DataFromTopic data) {
+ try {
+ PmReport report = createPmReport(data);
+ if (report.event.perf3gppFields == null) {
+ logger.warn("Received PM report with no perf3gppFields, ignored. {}", data);
+ return FilteredData.empty();
+ }
+
+ if (!filter(report, this.filterData)) {
+ return FilteredData.empty();
+ }
+ return new FilteredData(data.key, gson.toJson(report));
+ } catch (Exception e) {
+ logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
+ return FilteredData.empty();
}
- return gson.toJson(report);
+ }
+ @SuppressWarnings("java:S2445") // "data" is a method parameter, and should not be used for synchronization.
+ private PmReport createPmReport(DataFromTopic data) {
+ synchronized (data) {
+ if (data.getCachedPmReport() == null) {
+ data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class));
+ }
+ return data.getCachedPmReport();
+ }
}
/**
return newMeasResults;
}
+ private boolean isMeasInstIdMatch(String measObjInstId, FilterData filter) {
+ return filter.measObjInstIds.isEmpty() || isContainedInAny(measObjInstId, filter.measObjInstIds);
+ }
+
+ private String managedObjectClass(String distinguishedName) {
+ int lastRdn = distinguishedName.lastIndexOf(",");
+ if (lastRdn == -1) {
+ return "";
+ }
+ int lastEqualChar = distinguishedName.indexOf("=", lastRdn);
+ if (lastEqualChar == -1) {
+ return "";
+ }
+ return distinguishedName.substring(lastRdn + 1, lastEqualChar);
+ }
+
+ private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) {
+ if (filter.measObjClass.isEmpty()) {
+ return true;
+ }
+
+ String measObjClass = managedObjectClass(measObjInstId);
+ return filter.measObjClass.contains(measObjClass);
+ }
+
private PmReport.MeasValuesList createMeasValuesList(PmReport.MeasValuesList oldMeasValues,
PmReport.MeasTypes measTypes, FilterData filter) {
PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone();
- if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) {
+ if (isMeasInstIdMatch(oldMeasValues.measObjInstId, filter)
+ && isMeasInstClassMatch(oldMeasValues.measObjInstId, filter)) {
newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter);
}
return newMeasValuesList;
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RegexpFilter implements Filter {
+class RegexpFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(RegexpFilter.class);
private Pattern regexp;
}
@Override
- public String filter(String data) {
+ public FilteredData filter(DataFromTopic data) {
if (regexp == null) {
- return data;
+ return new FilteredData(data.key, data.value);
}
- Matcher matcher = regexp.matcher(data);
+ Matcher matcher = regexp.matcher(data.value);
boolean match = matcher.find();
if (match) {
- return data;
+ return new FilteredData(data.key, data.value);
} else {
- return "";
+ return FilteredData.empty();
}
}
package org.oran.dmaapadapter.repository;
+import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.springframework.util.StringUtils;
@ToString
+@Builder
public class InfoType {
@Getter
- private final String id;
+ private String id;
@Getter
- private final String dmaapTopicUrl;
+ private String dmaapTopicUrl;
@Getter
- private final boolean useHttpProxy;
+ @Builder.Default
+ private boolean useHttpProxy = false;
@Getter
- private final String kafkaInputTopic;
+ private String kafkaInputTopic;
- private final String dataType;
+ private String dataType;
@Getter
+ @Builder.Default
private boolean isJson = false;
- public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType,
- boolean isJson) {
- this.id = id;
- this.dmaapTopicUrl = dmaapTopicUrl;
- this.useHttpProxy = useHttpProxy;
- this.kafkaInputTopic = kafkaInputTopic;
- this.dataType = dataType;
- this.isJson = isJson;
- }
-
public boolean isKafkaTopicDefined() {
return StringUtils.hasLength(kafkaInputTopic);
}
return DataType.PM_DATA;
}
return DataType.OTHER;
+ }
+
+ public String getKafkaGroupId() {
+ return this.kafkaInputTopic == null ? null : "osc-dmaap-adapter-" + getId();
+ }
+
+ public String getKafkaClientId(ApplicationConfig appConfig) {
+ return this.kafkaInputTopic == null ? null : getId() + "_" + appConfig.getSelfUrl();
}
}
package org.oran.dmaapadapter.repository;
-import com.google.gson.GsonBuilder;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
+import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.oran.dmaapadapter.clients.AsyncRestClient;
-import org.oran.dmaapadapter.repository.filters.Filter;
-import org.oran.dmaapadapter.repository.filters.JsltFilter;
-import org.oran.dmaapadapter.repository.filters.JsonPathFilter;
-import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.repository.filters.RegexpFilter;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ToString
public class Job {
-
- private static com.google.gson.Gson gson = new GsonBuilder().create();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ @Builder
+ @Schema(name = "job_statistics", description = "Statistics information for one job")
+ public static class Statistics {
+
+ // @Schema(name = "jobId", description = "jobId", required = true)
+ // @SerializedName("jobId")
+ @JsonProperty(value = "jobId", required = true)
+ String jobId;
+
+ @JsonProperty(value = "typeId", required = true)
+ String typeId;
+
+ @JsonProperty(value = "inputTopic", required = false)
+ String inputTopic;
+
+ @JsonProperty(value = "outputTopic", required = false)
+ String outputTopic;
+
+ @JsonProperty(value = "groupId", required = false)
+ String groupId;
+
+ @JsonProperty(value = "clientId", required = false)
+ String clientId;
+
+ @JsonProperty(value = "noOfReceivedObjects", required = true)
+ @Builder.Default
+ int noOfReceivedObjects = 0;
+
+ @JsonProperty(value = "noOfReceivedBytes", required = true)
+ @Builder.Default
+ int noOfReceivedBytes = 0;
+
+ @JsonProperty(value = "noOfSentObjects", required = true)
+ @Builder.Default
+ int noOfSentObjects = 0;
+
+ @JsonProperty(value = "noOfSentBytes", required = true)
+ @Builder.Default
+ int noOfSentBytes = 0;
+
+ public void received(String str) {
+ noOfReceivedBytes += str.length();
+ noOfReceivedObjects += 1;
+
+ }
+
+ public void filtered(String str) {
+ noOfSentBytes += str.length();
+ noOfSentObjects += 1;
+ }
+
+ }
+
public static class Parameters {
public static final String REGEXP_TYPE = "regexp";
public static final String PM_FILTER_TYPE = "pmdata";
@Setter
private String filterType = REGEXP_TYPE;
+ @Getter
private Object filter;
@Getter
private BufferTimeout bufferTimeout;
return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
}
- public String getFilterAsString() {
- return this.filter.toString();
- }
-
- public PmReportFilter.FilterData getPmFilter() {
- String str = gson.toJson(this.filter);
- return gson.fromJson(str, PmReportFilter.FilterData.class);
- }
-
- public enum FilterType {
- REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
- }
-
- public FilterType getFilterType() {
+ public Filter.Type getFilterType() {
if (filter == null || filterType == null) {
- return FilterType.NONE;
+ return Filter.Type.NONE;
} else if (filterType.equalsIgnoreCase(JSLT_FILTER_TYPE)) {
- return FilterType.JSLT;
+ return Filter.Type.JSLT;
} else if (filterType.equalsIgnoreCase(JSON_PATH_FILTER_TYPE)) {
- return FilterType.JSON_PATH;
+ return Filter.Type.JSON_PATH;
} else if (filterType.equalsIgnoreCase(REGEXP_TYPE)) {
- return FilterType.REGEXP;
+ return Filter.Type.REGEXP;
} else if (filterType.equalsIgnoreCase(PM_FILTER_TYPE)) {
- return FilterType.PM_DATA;
+ return Filter.Type.PM_DATA;
} else {
logger.warn("Unsupported filter type: {}", this.filterType);
- return FilterType.NONE;
+ return Filter.Type.NONE;
}
}
}
private final Filter filter;
+ @Getter
+ private final Statistics statistics;
+
@Getter
private final AsyncRestClient consumerRestClient;
public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
- AsyncRestClient consumerRestClient) {
+ AsyncRestClient consumerRestClient, ApplicationConfig appConfig) {
this.id = id;
this.callbackUrl = callbackUrl;
this.type = type;
this.owner = owner;
this.lastUpdated = lastUpdated;
this.parameters = parameters;
- filter = createFilter(parameters);
+ filter = parameters.filter == null ? null
+ : FilterFactory.create(parameters.getFilter(), parameters.getFilterType());
this.consumerRestClient = consumerRestClient;
- }
-
- private static Filter createFilter(Parameters parameters) {
- if (parameters.filter == null) {
- return null;
- }
+ statistics = Statistics.builder() //
+ .groupId(type.getKafkaGroupId()) //
+ .inputTopic(type.getKafkaInputTopic()) //
+ .jobId(id) //
+ .outputTopic(parameters.getKafkaOutputTopic()) //
+ .typeId(type.getId()) //
+ .clientId(type.getKafkaClientId(appConfig)) //
+ .build();
- switch (parameters.getFilterType()) {
- case PM_DATA:
- return new PmReportFilter(parameters.getPmFilter());
- case REGEXP:
- return new RegexpFilter(parameters.getFilterAsString());
- case JSLT:
- return new JsltFilter(parameters.getFilterAsString());
- case JSON_PATH:
- return new JsonPathFilter(parameters.getFilterAsString());
- case NONE:
- return null;
- default:
- logger.error("Not handeled filter type: {}", parameters.getFilterType());
- return null;
- }
}
- public String filter(String data) {
+ public Filter.FilteredData filter(DataFromTopic data) {
if (filter == null) {
logger.debug("No filter used");
- return data;
+ return new Filter.FilteredData(data.key, data.value);
}
return filter.filter(data);
}
private MultiMap<Job> jobsByType = new MultiMap<>();
private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
+ private final ApplicationConfig appConfig;
- public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) {
+ public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext,
+ @Autowired ApplicationConfig appConfig) {
restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
+ this.appConfig = appConfig;
}
public synchronized Job getJob(String id) throws ServiceException {
AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
: restclientFactory.createRestClientNoHttpProxy(callbackUrl);
- Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+ Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
this.put(job);
synchronized (observers) {
this.observers.forEach(obs -> obs.onJobbAdded(job));
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
/**
* The class fetches incoming requests from DMAAP and sends them further to the
private final AsyncRestClient dmaapRestClient;
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
- private Many<Output> output;
- private Disposable topicReceiverTask;
+ private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+ private Flux<DataFromTopic> dataFromDmaap;
public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
AsyncRestClientFactory restclientFactory =
}
@Override
- public Many<Output> getOutput() {
- return this.output;
- }
-
- @Override
- public void start() {
- stop();
-
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
- topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) //
- .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .doOnNext(this::onReceivedData) //
- .subscribe(//
- null, //
- throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- this::onComplete); //
- }
-
- @Override
- public void stop() {
- if (topicReceiverTask != null) {
- topicReceiverTask.dispose();
- topicReceiverTask = null;
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromDmaap == null) {
+ this.dataFromDmaap = startFetchFromDmaap();
}
+ return this.dataFromDmaap;
}
- private void onComplete() {
- logger.warn("DmaapMessageConsumer completed {}", type.getId());
- start();
- }
-
- private void onReceivedData(String input) {
- logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
- output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST);
+ private Flux<DataFromTopic> startFetchFromDmaap() {
+ return Flux.range(0, Integer.MAX_VALUE) //
+ .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
+ .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
+ .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
+ .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
+ .publish() //
+ .autoConnect() //
+ .map(input -> new DataFromTopic("", input)); //
}
private String getDmaapUrl() {
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.flatMapMany(this::splitJsonArray) //
- .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+ .doOnNext(message -> logger.debug("Message from DMaaP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
}
package org.oran.dmaapadapter.tasks;
+import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class HttpDataConsumer extends DataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class);
+public class HttpJobDataDistributor extends JobDataDistributor {
+ private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpDataConsumer(Job job) {
+ public HttpJobDataDistributor(Job job) {
super(job);
}
@Override
- protected Mono<String> sendToClient(TopicListener.Output output) {
+ protected Mono<String> sendToClient(Filter.FilteredData output) {
Job job = this.getJob();
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
import lombok.Getter;
+import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public abstract class DataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class);
+public abstract class JobDataDistributor {
+ private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
@Getter
private final Job job;
private Disposable subscription;
}
}
- protected DataConsumer(Job job) {
+ protected JobDataDistributor(Job job) {
this.job = job;
}
- public synchronized void start(Flux<TopicListener.Output> input) {
+ public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
- this.subscription = handleReceivedMessage(input, job) //
+ this.subscription = filterAndBuffer(input, this.job) //
.flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
- .subscribe(this::handleConsumerSentOk, //
+ .subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
() -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
}
stop();
}
- protected abstract Mono<String> sendToClient(TopicListener.Output output);
+ protected abstract Mono<String> sendToClient(Filter.FilteredData output);
public synchronized void stop() {
if (this.subscription != null) {
return this.subscription != null;
}
- private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
- Flux<TopicListener.Output> result =
- inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
- .filter(t -> !t.value.isEmpty()); //
+ private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+ Flux<Filter.FilteredData> filtered = //
+ inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+ .map(job::filter) //
+ .filter(f -> !f.isEmpty()) //
+ .doOnNext(f -> job.getStatistics().filtered(f.value)); //
if (job.isBuffered()) {
- result = result.map(input -> quoteNonJson(input.value, job)) //
+ filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new TopicListener.Output("", buffered.toString()));
+ .map(buffered -> new Filter.FilteredData("", buffered.toString()));
}
- return result;
+ return filtered;
}
private String quoteNonJson(String str, Job job) {
}
}
- private void handleConsumerSentOk(String data) {
+ private void handleSentOk(String data) {
this.errorStats.handleOkFromConsumer();
}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaDataConsumer extends DataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class);
+public class KafkaJobDataDistributor extends JobDataDistributor {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
private KafkaSender<String, String> sender;
private final ApplicationConfig appConfig;
- public KafkaDataConsumer(Job job, ApplicationConfig appConfig) {
+ public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
super(job);
this.appConfig = appConfig;
}
@Override
- protected Mono<String> sendToClient(TopicListener.Output data) {
+ protected Mono<String> sendToClient(Filter.FilteredData data) {
Job job = this.getJob();
logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
}
@Override
- public synchronized void start(Flux<TopicListener.Output> input) {
+ public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
super.start(input);
SenderOptions<String, String> senderOptions = senderOptions(appConfig);
this.sender = KafkaSender.create(senderOptions);
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+ private SenderRecord<String, String, Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
int correlationMetadata = 2;
String topic = infoJob.getParameters().getKafkaOutputTopic();
return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
package org.oran.dmaapadapter.tasks;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.Disposable;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private Many<Output> output;
- private Disposable topicReceiverTask;
+ private Flux<DataFromTopic> dataFromTopic;
+
+ private static Gson gson = new GsonBuilder() //
+ .disableHtmlEscaping() //
+ .create(); //
+
+ @ToString
+ @Builder
+ public static class NewFileEvent {
+ @Getter
+ private String filename;
+ }
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
}
@Override
- public Many<Output> getOutput() {
- return this.output;
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromTopic == null) {
+ this.dataFromTopic = startReceiveFromTopic(this.type.getKafkaClientId(this.applicationConfig));
+ }
+ return this.dataFromTopic;
}
- @Override
- public void start() {
- stop();
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+ private Flux<DataFromTopic> startReceiveFromTopic(String clientId) {
logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
- topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
- .receive() //
- .doOnNext(this::onReceivedData) //
- .subscribe(null, //
- this::onReceivedError, //
- () -> logger.warn("KafkaTopicReceiver stopped"));
- }
- @Override
- public void stop() {
- if (topicReceiverTask != null) {
- topicReceiverTask.dispose();
- topicReceiverTask = null;
- }
+ return KafkaReceiver.create(kafkaInputProperties(clientId)) //
+ .receiveAutoAck() //
+ .concatMap(consumerRecord -> consumerRecord) //
+ .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
+ input.value())) //
+ .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
+ .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+ .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+ .map(input -> new DataFromTopic(input.key(), input.value())) //
+ .map(this::getDataFromFileIfNewPmFileEvent) //
+ .publish() //
+ .autoConnect(1);
}
- private void onReceivedData(ConsumerRecord<String, String> input) {
- logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
- output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST);
- }
+ private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
- private void onReceivedError(Throwable t) {
- logger.error("KafkaTopicReceiver error: {}", t.getMessage());
+ if (!applicationConfig.getPmFilesPath().isEmpty() //
+ && this.type.getDataType() == InfoType.DataType.PM_DATA //
+ && data.value.length() < 1000) {
+ try {
+ NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+ Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename());
+ String pmReportJson = Files.readString(path, Charset.defaultCharset());
+ return new DataFromTopic(data.key, pmReportJson);
+ } catch (Exception e) {
+ return data;
+ }
+ } else {
+ return data;
+ }
}
- private ReceiverOptions<String, String> kafkaInputProperties() {
+ private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter");
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
return ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
private final AsyncRestClient restClient;
private final ApplicationConfig applicationConfig;
private final InfoTypes types;
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
- private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
@Getter
private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10;
logger.warn("Registration of producer failed {}", t.getMessage());
}
+ private String producerRegistrationUrl() {
+ final String producerId = this.applicationConfig.getSelfUrl().replace("/", "_");
+ return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
+ }
+
// Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
- final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
- return restClient.get(url) //
+ return restClient.get(producerRegistrationUrl()) //
.flatMap(this::isRegisterredInfoCorrect) //
.onErrorResume(t -> Mono.just(Boolean.FALSE));
}
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
- final String producerUrl =
- applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
- .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo())));
+ .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
}
private Object typeSpecifcInfoObject() {
package org.oran.dmaapadapter.tasks;
+import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
-import reactor.core.publisher.Sinks.Many;
+
+import org.oran.dmaapadapter.filter.PmReport;
+import reactor.core.publisher.Flux;
public interface TopicListener {
@ToString
- public static class Output {
+ public static class DataFromTopic {
public final String key;
public final String value;
- public Output(String key, String value) {
+ @Getter
+ @Setter
+ private PmReport cachedPmReport;
+
+ public DataFromTopic(String key, String value) {
this.key = key;
this.value = value;
}
}
- public void start();
-
- public void stop();
-
- public Many<Output> getOutput();
+ public Flux<DataFromTopic> getFlux();
}
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
@Getter
- private final MultiMap<DataConsumer> dataConsumers = new MultiMap<>(); // Key is typeId, jobId
+ private final MultiMap<JobDataDistributor> dataDistributors = new MultiMap<>(); // Key is typeId, jobId
private final ApplicationConfig appConfig;
- private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
-
public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
@Autowired SecurityContext securityContext) {
this.appConfig = appConfig;
removeJob(job);
logger.debug("Job added {}", job.getId());
if (job.getType().isKafkaTopicDefined()) {
- addConsumer(job, dataConsumers, kafkaTopicListeners);
+ addConsumer(job, dataDistributors, kafkaTopicListeners);
}
if (job.getType().isDmaapTopicDefined()) {
- addConsumer(job, dataConsumers, dmaapTopicListeners);
+ addConsumer(job, dataDistributors, dmaapTopicListeners);
}
}
- private DataConsumer createConsumer(Job job) {
- return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig)
- : new HttpDataConsumer(job);
+ private JobDataDistributor createConsumer(Job job) {
+ return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
+ : new HttpJobDataDistributor(job);
}
- private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
+ private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+ Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
- if (consumers.get(job.getType().getId()).isEmpty()) {
- topicListener.start();
- }
- DataConsumer consumer = createConsumer(job);
- consumer.start(topicListener.getOutput().asFlux());
- consumers.put(job.getType().getId(), job.getId(), consumer);
+ JobDataDistributor distributor = createConsumer(job);
+ distributor.start(topicListener.getFlux());
+ distributors.put(job.getType().getId(), job.getId(), distributor);
}
public synchronized void removeJob(Job job) {
- removeJob(job, dataConsumers);
+ removeJob(job, dataDistributors);
}
- private static void removeJob(Job job, MultiMap<DataConsumer> consumers) {
- DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
- if (consumer != null) {
+ private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
+ JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+ if (distributor != null) {
logger.debug("Job removed {}", job.getId());
- consumer.stop();
- }
- }
-
- @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningKafkaTopics() {
- for (DataConsumer consumer : this.dataConsumers.values()) {
- if (!consumer.isRunning()) {
- restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer);
- }
+ distributor.stop();
}
-
}
- private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
- MultiMap<DataConsumer> consumers, DataConsumer consumer) {
- InfoType type = consumer.getJob().getType();
- TopicListener topic = topicListeners.get(type.getId());
- topic.start();
- restartConsumersOfType(consumers, topic, type);
- }
-
- private static void restartConsumersOfType(MultiMap<DataConsumer> consumers, TopicListener topic, InfoType type) {
- consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
- }
}
}
]
},
+ "measObjClass": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
"measTypes": {
"type": "array",
"items": [
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.PmReport;
+import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.repository.filters.PmReport;
-import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.DataConsumer;
+import org.oran.dmaapadapter.tasks.JobDataDistributor;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
}
@BeforeEach
- void setPort() {
+ void init() {
this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
+ assertThat(this.jobs.size()).isZero();
+ assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
+ assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
}
@AfterEach
void reset() {
+ for (Job job : this.jobs.getAll()) {
+ this.icsSimulatorController.deleteJob(job.getId(), restClient());
+ }
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
- this.jobs.clear();
+
}
private AsyncRestClient restClient(boolean useTrustValidation) {
}
}
+ @Test
+ void testTrustValidation() throws IOException {
+
+ String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
+ ResponseEntity<String> resp = restClient(true).getForEntity(url).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+ }
+
@Test
void testResponseCodes() throws Exception {
String supervisionUrl = baseUrl() + ProducerCallbacksController.SUPERVISION_URL;
this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID);
+ JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID);
// Handle received data from Kafka, check that it has been posted to the
// consumer
- kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
+ kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
-
- // Test send an exception
- kafkaConsumer.start(Flux.error(new NullPointerException()));
-
- // Test regular restart of stopped
- kafkaConsumer.stop();
- this.topicListeners.restartNonRunningKafkaTopics();
- await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
}
@Test
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
String jobs = restClient().get(jobUrl).block();
assertThat(jobs).contains(JOB_ID);
-
- // Delete the job
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ DmaapSimulatorController.addResponse("[\"DmaapResponse11\", \"DmaapResponse22\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
- assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11");
+ assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
// Delete the job
this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
+ // Test that deleting the the last job did not terminate the DmaapTopicListener
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ DmaapSimulatorController.addResponse("[\"DmaapResponse77\", \"DmaapResponse88\"]");
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4));
}
static class PmReportArray extends ArrayList<PmReport> {
// filtered PM message
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+ DmaapSimulatorController.addPmResponse("{}"); // This should just be discarded
+
DmaapSimulatorController.addPmResponse(pmReportJson);
ConsumerController.TestResults consumer = this.consumerController.testResults;
PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
assertThat(reportsParsed).hasSize(1);
-
}
@Test
DmaapSimulatorController.addResponse("[\"Hello\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
String received = consumer.receivedBodies.get(0);
assertThat(received).isEqualTo("Hello");
- // This is the only time it is verified that mime type is plaintext when isJson
- // is false and buffering is not used
- assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
// Check that the auth token was received by the consumer
assertThat(consumer.receivedHeaders).hasSize(1);
Map<String, String> headers = consumer.receivedHeaders.get(0);
assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
+
+ // This is the only time it is verified that mime type is plaintext when isJson
+ // is false and buffering is not used
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
+
Files.delete(authFile);
+ this.securityContext.setAuthTokenFilePath(null);
}
@Test
.hasSize(this.types.size()));
}
+ @Test
+ void testStatistics() throws ServiceException {
+ // Register producer, Register types
+ waitForRegistration();
+ final String JOB_ID = "testStatistics";
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp());
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+ String stats = restClient().get(targetUri).block();
+
+ assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+
+ }
+
public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
testErrorCode(request, expStatus, responseContains, true);
}
public class IcsSimulatorController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final static Gson gson = new GsonBuilder().create();
+ private final static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
public static class TestResults {
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
@SuppressWarnings("java:S3577") // Rename class
-@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
@Autowired
private SecurityContext securityContext;
- private static Gson gson = new GsonBuilder().create();
+ private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
static class TestApplicationConfig extends ApplicationConfig {
import com.google.gson.JsonParser;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DataConsumer;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.TestPropertySource;
-import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
- "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
-})
+ "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
+ "app.pm-files-path=./src/test/resources/"}) //
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
+ final String PM_TYPE_ID = "PmInformationTypeKafka";
@Autowired
private ApplicationConfig applicationConfig;
@Autowired
private SecurityContext securityContext;
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
}
}
+ private static class KafkaReceiver {
+ public final String OUTPUT_TOPIC;
+ private TopicListener.DataFromTopic receivedKafkaOutput;
+ private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+ int count = 0;
+
+ public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) {
+ this.OUTPUT_TOPIC = outputTopic;
+
+ // Create a listener to the output topic. The KafkaTopicListener happens to be
+ // suitable for that,
+ InfoType type =
+ InfoType.builder().id("TestReceiver").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
+
+ KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+
+ topicListener.getFlux() //
+ .doOnNext(this::set) //
+ .doFinally(sig -> logger.info("Finally " + sig)) //
+ .subscribe();
+ }
+
+ private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
+ this.receivedKafkaOutput = receivedKafkaOutput;
+ this.count++;
+ logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+ }
+
+ synchronized String lastKey() {
+ return this.receivedKafkaOutput.key;
+ }
+
+ synchronized String lastValue() {
+ return this.receivedKafkaOutput.value;
+ }
+
+ void reset() {
+ count = 0;
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
+ }
+ }
+
+ private static KafkaReceiver kafkaReceiver;
+ private static KafkaReceiver kafkaReceiver2;
+
+ @BeforeEach
+ void init() {
+ if (kafkaReceiver == null) {
+ kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic");
+ kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2");
+ }
+ kafkaReceiver.reset();
+ kafkaReceiver2.reset();
+ }
+
@AfterEach
void reset() {
+ for (Job job : this.jobs.getAll()) {
+ this.icsSimulatorController.deleteJob(job.getId(), restClient());
+ }
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
+
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
- this.jobs.clear();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
}
}
+ ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+ try {
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+ String str = gson.toJson(param);
+ Object parametersObj = jsonObject(str);
+
+ return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", null, "");
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
ConsumerJobInfo consumerJobInfoKafka(String topic) {
try {
Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
+ // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(String data) {
- return senderRecord(data, "");
- }
-
- private SenderRecord<String, String, Integer> senderRecord(String data, String key) {
- final InfoType infoType = this.types.get(TYPE_ID);
+ private SenderRecord<String, String, Integer> senderRecord(String data, String key, String typeId) {
+ final InfoType infoType = this.types.get(typeId);
int correlationMetadata = 2;
return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
}
}
}
+ private void verifiedReceivedByConsumerLast(String s) {
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+
+ await().untilAsserted(() -> assertThat(last(consumer.receivedBodies)).isEqualTo(s));
+ }
+
+ private String last(List<String> l) {
+ return l.isEmpty() ? "" : l.get(l.size() - 1);
+ }
+
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
- private static void sleep(long millis) throws InterruptedException {
- Thread.sleep(millis);
+ private static void waitForKafkaListener() throws InterruptedException {
+ Thread.sleep(4000);
}
@Test
this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ waitForKafkaListener();
- sleep(4000);
- var dataToSend = Flux.just(senderRecord("Message", ""));
+ var dataToSend = Flux.just(senderRecord("Message", "", TYPE_ID));
sendDataToStream(dataToSend);
verifiedReceivedByConsumer("Message");
+ }
+
+ @Test
+ void kafkaIntegrationTest() throws Exception {
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
- this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
- }
+ // Create two jobs. One buffering and one with a filter
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+ restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ waitForKafkaListener();
- TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ // Message_2
+ // etc.
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+ }
@Test
void sendToKafkaConsumer() throws ServiceException, InterruptedException {
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- final String OUTPUT_TOPIC = "outputTopic";
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ waitForKafkaListener();
+
+ String sendString = "testData " + Instant.now();
+ String sendKey = "key " + Instant.now();
+ var dataToSend = Flux.just(senderRecord(sendString, sendKey, TYPE_ID));
+ sendDataToStream(dataToSend);
+
+ await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
+ assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
+
+ printStatistics();
+ }
+
+ private void printStatistics() {
+ String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+ String stats = restClient().get(targetUri).block();
+ logger.info("Stats : {}", stats);
+ }
+
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ @Test
+ void kafkaCharacteristics() throws Exception {
+ final String JOB_ID = "kafkaCharacteristics";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ waitForKafkaListener();
- // Create a listener to the output topic. The KafkaTopicListener happens to be
- // suitable for that,
- InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
- KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type);
- receiver.start();
+ final int NO_OF_OBJECTS = 100000;
- Disposable disponsable = receiver.getOutput().asFlux() //
- .doOnNext(output -> {
- receivedKafkaOutput = output;
- logger.info("*** recived {}, {}", OUTPUT_TOPIC, output);
- }) //
- .doFinally(sig -> logger.info("Finally " + sig)) //
- .subscribe();
+ Instant startTime = Instant.now();
- String sendString = "testData " + Instant.now();
- String sendKey = "key " + Instant.now();
- var dataToSend = Flux.just(senderRecord(sendString, sendKey));
- sleep(4000);
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ // etc.
sendDataToStream(dataToSend);
- await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString));
- assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey);
+ while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) {
+ logger.info("sleeping {}", kafkaReceiver.lastValue());
+ Thread.sleep(1000 * 1);
+ }
- disponsable.dispose();
- receiver.stop();
+ final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+ logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
}
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
- void kafkaIntegrationTest() throws Exception {
- final String JOB_ID1 = "ID1";
- final String JOB_ID2 = "ID2";
+ void kafkaCharacteristics_pmFilter() throws Exception {
+ // Filter PM reports and sent to two jobs over Kafka
+
+ final String JOB_ID = "kafkaCharacteristics";
+ final String JOB_ID2 = "kafkaCharacteristics2";
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- // Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.getMeasObjClass().add("UtranCell");
+
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
+ restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
restClient());
- this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ waitForKafkaListener();
+
+ final int NO_OF_OBJECTS = 100000;
- sleep(2000);
- var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ Instant startTime = Instant.now();
+
+ KafkaTopicListener.NewFileEvent event =
+ KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build();
+ String eventAsString = gson.toJson(event);
+
+ String path = "./src/test/resources/pm_report.json";
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(eventAsString, "key", PM_TYPE_ID));
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+ while (kafkaReceiver.count != NO_OF_OBJECTS) {
+ logger.info("sleeping {}", kafkaReceiver.count);
+ Thread.sleep(1000 * 1);
+ }
- // Delete the jobs
- this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
+ // System.out.println(kafkaReceiver.receivedKafkaOutput.value);
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+ final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+ logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+ logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
+
+ printStatistics();
}
@Test
- void kafkaIOverflow() throws Exception {
+ void kafkaDeleteJobShouldNotStopListener() throws Exception {
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
- sendDataToStream(dataToSend); // this should overflow
-
- DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next();
- await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
- this.consumerController.testResults.reset();
-
- this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
- topicListeners.restartNonRunningKafkaTopics();
- sleep(1000); // Restarting the input seems to take some asynch time
-
- dataToSend = Flux.just(senderRecord("Howdy\""));
- sendDataToStream(dataToSend);
-
- verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
+ var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ // Message_2
+ // etc.
+ sendDataToStream(dataToSend); // this should not overflow
- // Delete the jobs
+ // Delete jobs, recreate one
this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
-
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ dataToSend = Flux.just(senderRecord("Howdy", "", TYPE_ID));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumerLast("Howdy");
}
}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
class JsltFilterTest {
+ private String filterReport(JsltFilter filter) throws Exception {
+ return filter.filter(new DataFromTopic("", loadReport())).value;
+ }
+
@Test
void testPickOneValue() throws Exception {
String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
+ ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue";
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
assertThat(res).isEqualTo(reQuote("'813'"));
}
+ ".";
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
assertThat(res).contains("event");
}
String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" //
+ ".";
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
assertThat(res).isEmpty();
}
"}"; //
JsltFilter filter = new JsltFilter(reQuote(expresssion));
- String res = filter.filter(loadReport());
+ String res = filterReport(filter);
String expected =
"{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}";
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
class JsonPathFilterTest {
void testJsonPath() throws Exception {
String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
JsonPathFilter filter = new JsonPathFilter(exp);
- String res = filter.filter(loadReport());
+ String res = filter.filter(new DataFromTopic("", loadReport())).value;
assertThat(res).isEqualTo("\"attTCHSeizures\"");
}
* ========================LICENSE_END===================================
*/
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener;
class PmReportFilterTest {
+ private String filterReport(PmReportFilter filter) throws Exception {
+ return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value;
+ }
+
@Test
void testPmFilterMeasTypes() throws Exception {
- String reportJson = loadReport();
-
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measTypes.add("succImmediateAssignProcs");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(reportJson);
+ String filtered = filterReport(filter);
assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1")
.contains("Gbg-997");
filterData = new PmReportFilter.FilterData();
filterData.measTypes.add("junk");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(reportJson);
+ filtered = filterReport(filter);
assertThat(filtered).isEmpty();
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measObjInstIds.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.measObjInstIds.add("UtranCell=Gbg-997");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
+ filtered = filterReport(filter);
assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998");
}
+ @Test
+ void testMeasObjClass() throws Exception {
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.measObjClass.add("junk");
+ PmReportFilter filter = new PmReportFilter(filterData);
+ String filtered = filterReport(filter);
+ assertThat(filtered).isEmpty();
+
+ filterData = new PmReportFilter.FilterData();
+ filterData.measObjClass.add("ENodeBFunction");
+ filter = new PmReportFilter(filterData);
+ filtered = filterReport(filter);
+ assertThat(filtered).contains("ENodeBFunction").doesNotContain("UtranCell");
+ }
+
@Test
void testSourceNames() throws Exception {
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.sourceNames.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.sourceNames.add("O-DU-1122");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
+ filtered = filterReport(filter);
assertThat(filtered).contains("O-DU-1122");
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.measuredEntityDns.add("junk");
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(loadReport());
+ String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
filterData = new PmReportFilter.FilterData();
filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1");
filter = new PmReportFilter(filterData);
- filtered = filter.filter(loadReport());
- assertThat(filtered).contains("RNC-Gbg-1"); // '=' is escaped to unicode by gson. OK
+ filtered = filterReport(filter);
+ assertThat(filtered).contains("ManagedElement=RNC-Gbg-1");
+ }
+
+ @Test
+ void testCrapInput() {
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ PmReportFilter filter = new PmReportFilter(filterData);
+
+ String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value;
+ assertThat(filtered).isEmpty();
+
+ filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value;
+ assertThat(filtered).isEmpty();
+
+ }
+
+ private String reQuote(String str) {
+ return str.replaceAll("'", "\\\"");
}
@Test
void testParse() throws Exception {
- com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
PmReport report = gson.fromJson(loadReport(), PmReport.class);
String dn = report.event.perf3gppFields.measDataCollection.measuredEntityDn;
},
{
"id": "KafkaInformationType",
- "kafkaInputTopic": "TutorialTopic",
+ "kafkaInputTopic": "KafkaInput",
"useHttpProxy": false
},
{
},
{
"id": "PmInformationTypeKafka",
- "kafkaInputTopic": "TutorialTopic",
+ "kafkaInputTopic": "PmFileData",
"useHttpProxy": false,
"dataType": "PmData",
"isJson": true