NONRTRIC - dmaap cherry picked from master 30/8930/2 1.1.1
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 4 Jul 2022 13:28:05 +0000 (15:28 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 25 Aug 2022 05:51:41 +0000 (07:51 +0200)
14 commits

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I65b198fe2860c51d14bd0ff1fa65f2f97a5062c2

NONRTRIC - dmaap adapter characteristic improvement

Fixed issues with backpressure.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I5d9a1cb7c741110010e3dd116a5c115061fb59dd

NONRTRIC - dmaap adapter characteristic improvement

Minor changes. Added a testcase.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: Ib45f40763f3124c5e8c32d66e23a7b4a1252e428

NONRTRIC - dmaap adapter characteristic improvement

Minor changes, aesthetics.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I7dc76691f45d30555be66511e1b78c6e5231d01f

NONRTRIC - dmaap adapter characteristic improvement

Minor changes, renamed some classes.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I1de5f8d88877b7a8f1693576ac52ca3bf5b5be51

NONRTRIC - dmaap adapter characteristic improvement

Added support for PM filtering of the MO class of the measured object.
Changed the ICS producer ID so that each instance will have a unique ID (using the callback URL).
Changed the Kafka group ID so that each type will have an own group ID
Setting kafka client ID (using the callback URL)

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I1031802469beb146039ed089e4c80f0ca83d4dd9

NONRTRIC - dmaap adapter characteristic improvement

Minor refactoring

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: Ibedc361673aad2a9362e31651d11853bd282df85

NONRTRIC - bugfix

If the auth-token-file parameter in the file application.yaml is missing, it would not default to an empty file name.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: Icb357e7a44968df07d42299650700f1003d30abd

NONRTRIC - bugfix

Taking care of incorrect/malformed input to the PM parsing.
Removing "map" output from PM filter that should not be there.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: Ic18d8f7ee3176ea39b91c850bec79ce36582c7ef

NONRTRIC - new certs, updated springboot version

Created new certs
Updated springboot version

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: Ie9f798cdb147873fcdc4fd4c9bb911d19abc4e49

NONRTRIC - optimization of PM filtering

If there are many PM job, the PM parsing is done once instead of onced per job.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I64dc8332f3efd1bd3a284f6896be7bd9a0dd9bf7

NONRTRIC - bugfix

Discarding of empty output filtering.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I141ed9aea0cb6d893f014ce49bb1d70a0dbaa8f0

NONRTRIC - bugfix

Bugfix reverting vback tosptingboot 2.5.8 (2.6.6) works bad with kafka.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: I20a78bdd0fa5368fa00933bea1293340bffcfbeb

Update license references

Patch set 2: Additional licensing
Patch set 3: Review licensing.
Patch set 4: Rebase and resolve conflict.

Issue-ID: NONRTRIC-779
C-Id: I856e0943d11f2b00f33250786fed6a92d4f11923
Signed-off-by: halil.cakal <halil.cakal@est.tech>
NONRTRIC - Statistics

Added feature for getting statistics.
   GET "/statistics"
Updated to latest 2.5 version of springboot.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
C-Id: Ifc0844ca20cab00d3ca99e5b58b2f56721a5e9c0
Change-Id: I747c917dcb25a3f740ad6e7dde301dac0d8c5fd8

43 files changed:
.readthedocs.yaml
LICENSE.txt [new file with mode: 0644]
api/api.json
api/api.yaml
config/README
config/application.yaml
config/application_configuration.json
config/keystore.jks
config/truststore.jks
docs/conf.py
docs/overview.rst
pom.xml
src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java
src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
src/main/java/org/oran/dmaapadapter/filter/Filter.java [moved from src/main/java/org/oran/dmaapadapter/repository/filters/Filter.java with 50% similarity]
src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java [moved from src/main/java/org/oran/dmaapadapter/repository/filters/JsltFilter.java with 80% similarity]
src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java [moved from src/main/java/org/oran/dmaapadapter/repository/filters/JsonPathFilter.java with 76% similarity]
src/main/java/org/oran/dmaapadapter/filter/PmReport.java [moved from src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java with 81% similarity]
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java [moved from src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java with 64% similarity]
src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java [moved from src/main/java/org/oran/dmaapadapter/repository/filters/RegexpFilter.java with 77% similarity]
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java [moved from src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java with 86% similarity]
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java [moved from src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java with 78% similarity]
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java [moved from src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java with 87% similarity]
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java [moved from src/test/java/org/oran/dmaapadapter/repository/filters/JsltFilterTest.java with 87% similarity]
src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java [moved from src/test/java/org/oran/dmaapadapter/repository/filters/JsonPathFilterTest.java with 89% similarity]
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java [moved from src/test/java/org/oran/dmaapadapter/repository/filters/PmReportFilterTest.java with 67% similarity]
src/test/resources/test_application_configuration.json

index 095222a..fd96bc1 100644 (file)
@@ -1,3 +1,20 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021-2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
 ---
 version: 2
 
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644 (file)
index 0000000..96589bf
--- /dev/null
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License. 
index a58aced..e8ea0c8 100644 (file)
             "description": "Void/empty",
             "type": "object"
         },
+        "job_statistics": {
+            "description": "Statistics information for one job",
+            "type": "object",
+            "required": [
+                "jobId",
+                "noOfReceivedBytes",
+                "noOfReceivedObjects",
+                "noOfSentBytes",
+                "noOfSentObjects",
+                "typeId"
+            ],
+            "properties": {
+                "noOfSentObjects": {
+                    "format": "int32",
+                    "type": "integer"
+                },
+                "jobId": {"type": "string"},
+                "outputTopic": {"type": "string"},
+                "noOfSentBytes": {
+                    "format": "int32",
+                    "type": "integer"
+                },
+                "clientId": {"type": "string"},
+                "groupId": {"type": "string"},
+                "noOfReceivedBytes": {
+                    "format": "int32",
+                    "type": "integer"
+                },
+                "typeId": {"type": "string"},
+                "inputTopic": {"type": "string"},
+                "noOfReceivedObjects": {
+                    "format": "int32",
+                    "type": "integer"
+                }
+            }
+        },
+        "statistics_info": {
+            "description": "Statistics information",
+            "type": "object",
+            "properties": {"jobStatistics": {
+                "description": "Statistics per job",
+                "type": "array",
+                "items": {"$ref": "#/components/schemas/job_statistics"}
+            }}
+        },
         "producer_registration_info": {
             "description": "Information for an Information Producer",
             "type": "object",
             }],
             "tags": ["Information Coordinator Service Simulator (exists only in test)"]
         }},
+        "/statistics": {"get": {
+            "summary": "Returns statistics",
+            "operationId": "getStatistics",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/statistics_info"}}}
+            }},
+            "tags": ["Producer job control API"]
+        }},
         "/generic_dataproducer/health_check": {"get": {
             "summary": "Producer supervision",
             "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
index 02697ee..bc10472 100644 (file)
@@ -72,6 +72,19 @@ paths:
             application/json:
               schema:
                 type: object
+  /statistics:
+    get:
+      tags:
+      - Producer job control API
+      summary: Returns statistics
+      operationId: getStatistics
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/statistics_info'
   /generic_dataproducer/health_check:
     get:
       tags:
@@ -455,6 +468,50 @@ components:
     void:
       type: object
       description: Void/empty
+    job_statistics:
+      required:
+      - jobId
+      - noOfReceivedBytes
+      - noOfReceivedObjects
+      - noOfSentBytes
+      - noOfSentObjects
+      - typeId
+      type: object
+      properties:
+        noOfSentObjects:
+          type: integer
+          format: int32
+        jobId:
+          type: string
+        outputTopic:
+          type: string
+        noOfSentBytes:
+          type: integer
+          format: int32
+        clientId:
+          type: string
+        groupId:
+          type: string
+        noOfReceivedBytes:
+          type: integer
+          format: int32
+        typeId:
+          type: string
+        inputTopic:
+          type: string
+        noOfReceivedObjects:
+          type: integer
+          format: int32
+      description: Statistics information for one job
+    statistics_info:
+      type: object
+      properties:
+        jobStatistics:
+          type: array
+          description: Statistics per job
+          items:
+            $ref: '#/components/schemas/job_statistics'
+      description: Statistics information
     producer_registration_info:
       required:
       - info_job_callback_url
index a2137b5..46f0c4a 100644 (file)
@@ -3,10 +3,12 @@ The keystore.jks and truststore.jks files are created by using the following com
 1) Create a CA certificate and a private key:
 
 openssl genrsa -des3 -out CA-key.pem 2048
-openssl req -new -key CA-key.pem -x509 -days 1000 -out CA-cert.pem
+openssl req -new -key CA-key.pem -x509 -days 3600 -out CA-cert.pem
 
 2) Create a keystore with a private key entry that is signed by the CA:
 
+Note: your name must be "localhost" for the unittest.
+
 keytool -genkeypair -alias policy_agent -keyalg RSA -keysize 2048 -keystore keystore.jks -validity 3650 -storepass policy_agent
 keytool -certreq -alias policy_agent -file request.csr -keystore keystore.jks -ext san=dns:your.domain.com -storepass policy_agent
 openssl x509 -req -days 365 -in request.csr -CA CA-cert.pem -CAkey CA-key.pem -CAcreateserial -out ca_signed-cert.pem
index f6cd665..0bd063c 100644 (file)
@@ -1,3 +1,20 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021-2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
 spring:
   profiles:
     active: prod
@@ -16,11 +33,13 @@ springdoc:
 logging:
   # Configuration of logging
   level:
-    ROOT: ERROR
+    ROOT: WARN
+    org.apache.kafka: WARN
     org.springframework: ERROR
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.oran.dmaapadapter: INFO
+
   file:
     name: /var/log/dmaap-adapter-service/application.log
 server:
@@ -58,4 +77,5 @@ app:
   kafka:
     bootstrap-servers: localhost:9092
   # If the file name is empty, no authorization token is used
-  auth-token-file:
\ No newline at end of file
+  auth-token-file:
+  pm-files-path: /tmp
index 881da34..fbe6e6b 100644 (file)
          "kafkaInputTopic": "TutorialTopic",
          "useHttpProxy": false
       },
-       {
+      {
          "id": "PmData",
          "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
          "useHttpProxy": true,
-         "dataType" : "pmData"
-      },
+         "dataType": "pmData"
+      }
    ]
-}
+}
\ No newline at end of file
index 675785b..563c67b 100644 (file)
Binary files a/config/keystore.jks and b/config/keystore.jks differ
index e883cd6..50a0f9e 100644 (file)
Binary files a/config/truststore.jks and b/config/truststore.jks differ
index eece28a..757aa74 100644 (file)
@@ -1,3 +1,20 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021-2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
 from docs_conf.conf import *
 
 #branch configuration
index e0d6008..1174e44 100644 (file)
@@ -186,6 +186,8 @@ The filterType parameter is extended to allow value "pmdata" which can be used f
   For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32".
 * measTypes selects the meas types to get
 * measuredEntityDns partial match of meas entity DNs.
+* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
+  Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
 
 All PM filter properties are optional and a non given will result in "match all".
 The result of the filtering is still following the structure of a 3GPP PM report.
@@ -195,7 +197,7 @@ Below follows an example of a PM filter.
 .. code-block:: javascript
 
     {
-      "filterType":"pmdata"
+      "filterType":"pmdata",
       "filter": {
         "sourceNames":[
            "O-DU-1122"
@@ -205,7 +207,7 @@ Below follows an example of a PM filter.
         ],
         "measTypes":[
            "succImmediateAssignProcs"
-        ],eparate call.
+        ],
         "measuredEntityDns":[
            "ManagedElement=RNC-Gbg-1"
         ]
diff --git a/pom.xml b/pom.xml
index d3f0df0..c7a2962 100644 (file)
--- a/pom.xml
+++ b/pom.xml
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.5.8</version>
+        <version>2.5.14</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric.plt</groupId>
     <artifactId>dmaapadapter</artifactId>
-    <version>1.1.0-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
     <licenses>
         <license>
             <name>The Apache Software License, Version 2.0</name>
@@ -49,7 +49,7 @@
         <java.version>11</java.version>
         <springfox.version>3.0.0</springfox.version>
         <gson.version>2.9.0</gson.version>
-        <swagger.version>2.1.6</swagger.version>
+        <swagger.version>2.2.1</swagger.version>
         <json.version>20211205</json.version>
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
         <formatter-maven-plugin.version>2.12.2</formatter-maven-plugin.version>
         <exec.skip>true</exec.skip>
     </properties>
     <dependencies>
-        <dependency>
-            <groupId>org.springdoc</groupId>
-            <artifactId>springdoc-openapi-ui</artifactId>
-            <version>1.6.3</version>
-        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>${gson.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.json</groupId>
-            <artifactId>json</artifactId>
-            <version>${json.version}</version>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
         </dependency>
         <!-- TEST -->
         <!-- https://mvnrepository.com/artifact/com.github.erosb/everit-json-schema -->
+        <dependency>
+            <groupId>org.springdoc</groupId>
+            <artifactId>springdoc-openapi-ui</artifactId>
+            <version>1.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.github.erosb</groupId>
             <artifactId>everit-json-schema</artifactId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.squareup.okhttp3</groupId>
-            <artifactId>mockwebserver</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
-            <version>1.3.9</version>
+            <version>1.3.12</version>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
index 83c0c42..85a18a9 100644 (file)
@@ -47,7 +47,7 @@ public class SecurityContext {
     @Setter
     private Path authTokenFilePath;
 
-    public SecurityContext(@Value("${app.auth-token-file:\"\"}") String authTokenFilename) {
+    public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
         if (!authTokenFilename.isEmpty()) {
             this.authTokenFilePath = Path.of(authTokenFilename);
         }
@@ -63,7 +63,7 @@ public class SecurityContext {
         }
         try {
             long lastModified = authTokenFilePath.toFile().lastModified();
-            if (lastModified != this.tokenTimestamp) {
+            if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
                 this.authToken = Files.readString(authTokenFilePath);
                 this.tokenTimestamp = lastModified;
             }
index e84889d..8c8e995 100644 (file)
@@ -69,7 +69,7 @@ public class ApplicationConfig {
     @Value("${app.webclient.trust-store}")
     private String sslTrustStore = "";
 
-    @Value("${app.webclient.http.proxy-host:\"\"}")
+    @Value("${app.webclient.http.proxy-host:}")
     private String httpProxyHost = "";
 
     @Value("${app.webclient.http.proxy-port:0}")
@@ -96,6 +96,10 @@ public class ApplicationConfig {
     @Value("${app.kafka.bootstrap-servers:}")
     private String kafkaBootStrapServers;
 
+    @Getter
+    @Value("${app.pm-files-path:}")
+    private String pmFilesPath;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
index 39f62fb..6ce5473 100644 (file)
@@ -35,6 +35,7 @@ import reactor.core.publisher.Mono;
 
 public class ErrorResponse {
     private static Gson gson = new GsonBuilder() //
+            .disableHtmlEscaping() //
             .create(); //
 
     // Returned as body for all failed REST calls
index 32ecd73..4967626 100644 (file)
@@ -33,6 +33,7 @@ import io.swagger.v3.oas.annotations.tags.Tag;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerJobInfo;
@@ -61,7 +62,10 @@ public class ProducerCallbacksController {
     public static final String API_DESCRIPTION = "";
     public static final String JOB_URL = "/generic_dataproducer/info_job";
     public static final String SUPERVISION_URL = "/generic_dataproducer/health_check";
-    private static Gson gson = new GsonBuilder().create();
+
+    public static final String STATISTICS_URL = "/statistics";
+
+    private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
     private final Jobs jobs;
     private final InfoTypes types;
 
@@ -145,4 +149,31 @@ public class ProducerCallbacksController {
         return new ResponseEntity<>(HttpStatus.OK);
     }
 
+    @Schema(name = "statistics_info", description = "Statistics information")
+    public class Statistics {
+
+        @Schema(description = "Statistics per job")
+        public final Collection<Job.Statistics> jobStatistics;
+
+        public Statistics(Collection<Job.Statistics> stats) {
+            this.jobStatistics = stats;
+        }
+
+    }
+
+    @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+    @Operation(summary = "Returns statistics", description = "")
+    @ApiResponses(value = { //
+            @ApiResponse(responseCode = "200", description = "OK", //
+                    content = @Content(schema = @Schema(implementation = Statistics.class))) //
+    })
+    public ResponseEntity<Object> getStatistics() {
+        List<Job.Statistics> res = new ArrayList<>();
+        for (Job job : this.jobs.getAll()) {
+            res.add(job.getStatistics());
+        }
+
+        return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK);
+    }
+
 }
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
+
+import lombok.ToString;
+
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 public interface Filter {
-    public String filter(String data);
 
+    public enum Type {
+        REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
+    }
+
+    @ToString
+    public static class FilteredData {
+        public final String key;
+        public final String value;
+        private static final FilteredData emptyData = new FilteredData("", "");
+
+        public boolean isEmpty() {
+            return value.isEmpty() && key.isEmpty();
+        }
+
+        public FilteredData(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public static FilteredData empty() {
+            return emptyData;
+        }
+    }
+
+    public FilteredData filter(DataFromTopic data);
 }
diff --git a/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java b/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java
new file mode 100644 (file)
index 0000000..8d51727
--- /dev/null
@@ -0,0 +1,59 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.filter;
+
+import com.google.gson.GsonBuilder;
+
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterFactory {
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static com.google.gson.Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+    private FilterFactory() {}
+
+    public static Filter create(Object filter, Filter.Type type) {
+        switch (type) {
+            case PM_DATA:
+                return new PmReportFilter(createPmFilterData(filter));
+            case REGEXP:
+                return new RegexpFilter(filter.toString());
+            case JSLT:
+                return new JsltFilter(filter.toString());
+            case JSON_PATH:
+                return new JsonPathFilter(filter.toString());
+            case NONE:
+                return null;
+            default:
+                logger.error("Not handeled filter type: {}", type);
+                return null;
+        }
+    }
+
+    private static PmReportFilter.FilterData createPmFilterData(Object filter) {
+        String str = gson.toJson(filter);
+        return gson.fromJson(str, PmReportFilter.FilterData.class);
+    }
+
+}
@@ -18,7 +18,7 @@
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
@@ -28,10 +28,11 @@ import com.fasterxml.jackson.databind.node.NullNode;
 import com.schibsted.spt.data.jslt.Expression;
 import com.schibsted.spt.data.jslt.Parser;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JsltFilter implements Filter {
+class JsltFilter implements Filter {
 
     private Expression expression;
     private final ObjectMapper mapper = new ObjectMapper();
@@ -46,22 +47,22 @@ public class JsltFilter implements Filter {
     }
 
     @Override
-    public String filter(String jsonString) {
+    public FilteredData filter(DataFromTopic data) {
         if (expression == null) {
-            return jsonString;
+            return new FilteredData(data.key, data.value);
         }
         try {
             JsonFactory factory = mapper.getFactory();
-            JsonParser parser = factory.createParser(jsonString);
+            JsonParser parser = factory.createParser(data.value);
             JsonNode actualObj = mapper.readTree(parser);
 
             JsonNode filteredNode = expression.apply(actualObj);
             if (filteredNode == NullNode.instance) {
-                return "";
+                return FilteredData.empty();
             }
-            return mapper.writeValueAsString(filteredNode);
+            return new FilteredData(data.key, mapper.writeValueAsString(filteredNode));
         } catch (Exception e) {
-            return "";
+            return FilteredData.empty();
         }
     }
 
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
 
 import com.jayway.jsonpath.JsonPath;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JsonPathFilter implements Filter {
+class JsonPathFilter implements Filter {
 
     private String expression;
     private static final Logger logger = LoggerFactory.getLogger(JsonPathFilter.class);
-    com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
     public JsonPathFilter(String exp) {
         try {
@@ -40,12 +41,12 @@ public class JsonPathFilter implements Filter {
     }
 
     @Override
-    public String filter(String jsonString) {
+    public FilteredData filter(DataFromTopic data) {
         try {
-            Object o = JsonPath.parse(jsonString).read(this.expression, Object.class);
-            return o == null ? "" : gson.toJson(o);
+            Object o = JsonPath.parse(data.value).read(this.expression, Object.class);
+            return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o));
         } catch (Exception e) {
-            return "";
+            return FilteredData.empty();
         }
 
     }
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
+
+import com.google.gson.annotations.Expose;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
 public class PmReport {
 
+    @Expose
     Event event = new Event();
 
     public static class CommonEventHeader {
+        @Expose
         String domain;
+
+        @Expose
         String eventId;
+
+        @Expose
         int sequence;
+
+        @Expose
         String eventName;
+
+        @Expose
         String sourceName;
+
+        @Expose
         String reportingEntityName;
+
+        @Expose
         String priority;
+
+        @Expose
         long startEpochMicrosec;
+
+        @Expose
         long lastEpochMicrosec;
+
+        @Expose
         String version;
+
+        @Expose
         String vesEventListenerVersion;
+
+        @Expose
         String timeZoneOffset;
     }
 
     public static class MeasInfoId {
-        String sMeasInfoId;
+        @Expose
+        String sMeasInfoId = "";
     }
 
     public static class MeasTypes {
@@ -54,17 +81,26 @@ public class PmReport {
             return sMeasTypesList.get(pValue - 1);
         }
 
+        @Expose
         protected ArrayList<String> sMeasTypesList = new ArrayList<>();
     }
 
     public static class MeasResult {
+        @Expose
         int p;
-        String sValue;
+
+        @Expose
+        String sValue = "";
     }
 
     public static class MeasValuesList {
+        @Expose
         String measObjInstId;
+
+        @Expose
         String suspectFlag;
+
+        @Expose
         Collection<MeasResult> measResults = new ArrayList<>();
 
         public MeasValuesList shallowClone() {
@@ -76,8 +112,13 @@ public class PmReport {
     }
 
     public static class MeasInfoList {
+        @Expose
         MeasInfoId measInfoId;
+
+        @Expose
         MeasTypes measTypes;
+
+        @Expose
         Collection<MeasValuesList> measValuesList = new ArrayList<>();
 
         public MeasInfoList shallowClone() {
@@ -89,20 +130,35 @@ public class PmReport {
     }
 
     public static class MeasDataCollection {
+        @Expose
         int granularityPeriod;
+
+        @Expose
         String measuredEntityUserName;
+
+        @Expose
         String measuredEntityDn;
+
+        @Expose
         String measuredEntitySoftwareVersion;
+
+        @Expose
         Collection<MeasInfoList> measInfoList = new ArrayList<>();
     }
 
     public static class Perf3gppFields {
+        @Expose
         String perf3gppFieldsVersion;
+
+        @Expose
         MeasDataCollection measDataCollection;
     }
 
     public static class Event {
+        @Expose
         CommonEventHeader commonEventHeader;
+
+        @Expose
         Perf3gppFields perf3gppFields;
     }
 
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
-
+package org.oran.dmaapadapter.filter;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import lombok.Getter;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.thymeleaf.util.StringUtils;
 
 public class PmReportFilter implements Filter {
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder() //
+            .disableHtmlEscaping() //
+            .excludeFieldsWithoutExposeAnnotation() //
+            .create();
+
+    // excludeFieldsWithoutExposeAnnotation is not needed when parsing and this is a
+    // bit quicker
+    private static com.google.gson.Gson gsonParse = new com.google.gson.GsonBuilder() //
+            .disableHtmlEscaping() //
+            .create();
 
-    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
     private final FilterData filterData;
 
     @Getter
     public static class FilterData {
-        Collection<String> sourceNames = new ArrayList<>();
-        Collection<String> measObjInstIds = new ArrayList<>();
-        Collection<String> measTypes = new ArrayList<>();
-        Collection<String> measuredEntityDns = new ArrayList<>();
+        final Collection<String> sourceNames = new HashSet<>();
+        final Collection<String> measObjInstIds = new ArrayList<>();
+        final Collection<String> measTypes = new HashSet<>();
+        final Collection<String> measuredEntityDns = new ArrayList<>();
+        final Collection<String> measObjClass = new HashSet<>();
     }
 
     private static class MeasTypesIndexed extends PmReport.MeasTypes {
+
         private Map<String, Integer> map = new HashMap<>();
 
         public int addP(String measTypeName) {
@@ -51,9 +68,9 @@ public class PmReportFilter implements Filter {
             if (p != null) {
                 return p;
             } else {
-                this.sMeasTypesList.add(measTypeName);
-                this.map.put(measTypeName, this.sMeasTypesList.size());
-                return this.sMeasTypesList.size();
+                sMeasTypesList.add(measTypeName);
+                this.map.put(measTypeName, sMeasTypesList.size());
+                return sMeasTypesList.size();
             }
         }
     }
@@ -63,13 +80,32 @@ public class PmReportFilter implements Filter {
     }
 
     @Override
-    public String filter(String data) {
-        PmReport report = gson.fromJson(data, PmReport.class);
-        if (!filter(report, this.filterData)) {
-            return "";
+    public FilteredData filter(DataFromTopic data) {
+        try {
+            PmReport report = createPmReport(data);
+            if (report.event.perf3gppFields == null) {
+                logger.warn("Received PM report with no perf3gppFields, ignored. {}", data);
+                return FilteredData.empty();
+            }
+
+            if (!filter(report, this.filterData)) {
+                return FilteredData.empty();
+            }
+            return new FilteredData(data.key, gson.toJson(report));
+        } catch (Exception e) {
+            logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
+            return FilteredData.empty();
         }
-        return gson.toJson(report);
+    }
 
+    @SuppressWarnings("java:S2445") // "data" is a method parameter, and should not be used for synchronization.
+    private PmReport createPmReport(DataFromTopic data) {
+        synchronized (data) {
+            if (data.getCachedPmReport() == null) {
+                data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class));
+            }
+            return data.getCachedPmReport();
+        }
     }
 
     /**
@@ -114,12 +150,38 @@ public class PmReportFilter implements Filter {
         return newMeasResults;
     }
 
+    private boolean isMeasInstIdMatch(String measObjInstId, FilterData filter) {
+        return filter.measObjInstIds.isEmpty() || isContainedInAny(measObjInstId, filter.measObjInstIds);
+    }
+
+    private String managedObjectClass(String distinguishedName) {
+        int lastRdn = distinguishedName.lastIndexOf(",");
+        if (lastRdn == -1) {
+            return "";
+        }
+        int lastEqualChar = distinguishedName.indexOf("=", lastRdn);
+        if (lastEqualChar == -1) {
+            return "";
+        }
+        return distinguishedName.substring(lastRdn + 1, lastEqualChar);
+    }
+
+    private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) {
+        if (filter.measObjClass.isEmpty()) {
+            return true;
+        }
+
+        String measObjClass = managedObjectClass(measObjInstId);
+        return filter.measObjClass.contains(measObjClass);
+    }
+
     private PmReport.MeasValuesList createMeasValuesList(PmReport.MeasValuesList oldMeasValues,
             PmReport.MeasTypes measTypes, FilterData filter) {
 
         PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone();
 
-        if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) {
+        if (isMeasInstIdMatch(oldMeasValues.measObjInstId, filter)
+                && isMeasInstClassMatch(oldMeasValues.measObjInstId, filter)) {
             newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter);
         }
         return newMeasValuesList;
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
 
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RegexpFilter implements Filter {
+class RegexpFilter implements Filter {
     private static final Logger logger = LoggerFactory.getLogger(RegexpFilter.class);
     private Pattern regexp;
 
@@ -39,16 +40,16 @@ public class RegexpFilter implements Filter {
     }
 
     @Override
-    public String filter(String data) {
+    public FilteredData filter(DataFromTopic data) {
         if (regexp == null) {
-            return data;
+            return new FilteredData(data.key, data.value);
         }
-        Matcher matcher = regexp.matcher(data);
+        Matcher matcher = regexp.matcher(data.value);
         boolean match = matcher.find();
         if (match) {
-            return data;
+            return new FilteredData(data.key, data.value);
         } else {
-            return "";
+            return FilteredData.empty();
         }
     }
 
index d7f89be..ce2e1a1 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
 
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.springframework.util.StringUtils;
 
 @ToString
+@Builder
 public class InfoType {
 
     @Getter
-    private final String id;
+    private String id;
 
     @Getter
-    private final String dmaapTopicUrl;
+    private String dmaapTopicUrl;
 
     @Getter
-    private final boolean useHttpProxy;
+    @Builder.Default
+    private boolean useHttpProxy = false;
 
     @Getter
-    private final String kafkaInputTopic;
+    private String kafkaInputTopic;
 
-    private final String dataType;
+    private String dataType;
 
     @Getter
+    @Builder.Default
     private boolean isJson = false;
 
-    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType,
-            boolean isJson) {
-        this.id = id;
-        this.dmaapTopicUrl = dmaapTopicUrl;
-        this.useHttpProxy = useHttpProxy;
-        this.kafkaInputTopic = kafkaInputTopic;
-        this.dataType = dataType;
-        this.isJson = isJson;
-    }
-
     public boolean isKafkaTopicDefined() {
         return StringUtils.hasLength(kafkaInputTopic);
     }
@@ -76,6 +71,14 @@ public class InfoType {
             return DataType.PM_DATA;
         }
         return DataType.OTHER;
+    }
+
+    public String getKafkaGroupId() {
+        return this.kafkaInputTopic == null ? null : "osc-dmaap-adapter-" + getId();
+    }
+
+    public String getKafkaClientId(ApplicationConfig appConfig) {
+        return this.kafkaInputTopic == null ? null : getId() + "_" + appConfig.getSelfUrl();
 
     }
 }
index 90827da..acb9136 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
-import com.google.gson.GsonBuilder;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
 
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 
+import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
-import org.oran.dmaapadapter.repository.filters.Filter;
-import org.oran.dmaapadapter.repository.filters.JsltFilter;
-import org.oran.dmaapadapter.repository.filters.JsonPathFilter;
-import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.repository.filters.RegexpFilter;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @ToString
 public class Job {
-
-    private static com.google.gson.Gson gson = new GsonBuilder().create();
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    @Builder
+    @Schema(name = "job_statistics", description = "Statistics information for one job")
+    public static class Statistics {
+
+        // @Schema(name = "jobId", description = "jobId", required = true)
+        // @SerializedName("jobId")
+        @JsonProperty(value = "jobId", required = true)
+        String jobId;
+
+        @JsonProperty(value = "typeId", required = true)
+        String typeId;
+
+        @JsonProperty(value = "inputTopic", required = false)
+        String inputTopic;
+
+        @JsonProperty(value = "outputTopic", required = false)
+        String outputTopic;
+
+        @JsonProperty(value = "groupId", required = false)
+        String groupId;
+
+        @JsonProperty(value = "clientId", required = false)
+        String clientId;
+
+        @JsonProperty(value = "noOfReceivedObjects", required = true)
+        @Builder.Default
+        int noOfReceivedObjects = 0;
+
+        @JsonProperty(value = "noOfReceivedBytes", required = true)
+        @Builder.Default
+        int noOfReceivedBytes = 0;
+
+        @JsonProperty(value = "noOfSentObjects", required = true)
+        @Builder.Default
+        int noOfSentObjects = 0;
+
+        @JsonProperty(value = "noOfSentBytes", required = true)
+        @Builder.Default
+        int noOfSentBytes = 0;
+
+        public void received(String str) {
+            noOfReceivedBytes += str.length();
+            noOfReceivedObjects += 1;
+
+        }
+
+        public void filtered(String str) {
+            noOfSentBytes += str.length();
+            noOfSentObjects += 1;
+        }
+
+    }
+
     public static class Parameters {
         public static final String REGEXP_TYPE = "regexp";
         public static final String PM_FILTER_TYPE = "pmdata";
@@ -52,6 +105,7 @@ public class Job {
 
         @Setter
         private String filterType = REGEXP_TYPE;
+        @Getter
         private Object filter;
         @Getter
         private BufferTimeout bufferTimeout;
@@ -76,33 +130,20 @@ public class Job {
             return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
         }
 
-        public String getFilterAsString() {
-            return this.filter.toString();
-        }
-
-        public PmReportFilter.FilterData getPmFilter() {
-            String str = gson.toJson(this.filter);
-            return gson.fromJson(str, PmReportFilter.FilterData.class);
-        }
-
-        public enum FilterType {
-            REGEXP, JSLT, JSON_PATH, PM_DATA, NONE
-        }
-
-        public FilterType getFilterType() {
+        public Filter.Type getFilterType() {
             if (filter == null || filterType == null) {
-                return FilterType.NONE;
+                return Filter.Type.NONE;
             } else if (filterType.equalsIgnoreCase(JSLT_FILTER_TYPE)) {
-                return FilterType.JSLT;
+                return Filter.Type.JSLT;
             } else if (filterType.equalsIgnoreCase(JSON_PATH_FILTER_TYPE)) {
-                return FilterType.JSON_PATH;
+                return Filter.Type.JSON_PATH;
             } else if (filterType.equalsIgnoreCase(REGEXP_TYPE)) {
-                return FilterType.REGEXP;
+                return Filter.Type.REGEXP;
             } else if (filterType.equalsIgnoreCase(PM_FILTER_TYPE)) {
-                return FilterType.PM_DATA;
+                return Filter.Type.PM_DATA;
             } else {
                 logger.warn("Unsupported filter type: {}", this.filterType);
-                return FilterType.NONE;
+                return Filter.Type.NONE;
             }
         }
     }
@@ -145,48 +186,39 @@ public class Job {
 
     private final Filter filter;
 
+    @Getter
+    private final Statistics statistics;
+
     @Getter
     private final AsyncRestClient consumerRestClient;
 
     public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
-            AsyncRestClient consumerRestClient) {
+            AsyncRestClient consumerRestClient, ApplicationConfig appConfig) {
         this.id = id;
         this.callbackUrl = callbackUrl;
         this.type = type;
         this.owner = owner;
         this.lastUpdated = lastUpdated;
         this.parameters = parameters;
-        filter = createFilter(parameters);
+        filter = parameters.filter == null ? null
+                : FilterFactory.create(parameters.getFilter(), parameters.getFilterType());
         this.consumerRestClient = consumerRestClient;
-    }
-
-    private static Filter createFilter(Parameters parameters) {
 
-        if (parameters.filter == null) {
-            return null;
-        }
+        statistics = Statistics.builder() //
+                .groupId(type.getKafkaGroupId()) //
+                .inputTopic(type.getKafkaInputTopic()) //
+                .jobId(id) //
+                .outputTopic(parameters.getKafkaOutputTopic()) //
+                .typeId(type.getId()) //
+                .clientId(type.getKafkaClientId(appConfig)) //
+                .build();
 
-        switch (parameters.getFilterType()) {
-            case PM_DATA:
-                return new PmReportFilter(parameters.getPmFilter());
-            case REGEXP:
-                return new RegexpFilter(parameters.getFilterAsString());
-            case JSLT:
-                return new JsltFilter(parameters.getFilterAsString());
-            case JSON_PATH:
-                return new JsonPathFilter(parameters.getFilterAsString());
-            case NONE:
-                return null;
-            default:
-                logger.error("Not handeled filter type: {}", parameters.getFilterType());
-                return null;
-        }
     }
 
-    public String filter(String data) {
+    public Filter.FilteredData filter(DataFromTopic data) {
         if (filter == null) {
             logger.debug("No filter used");
-            return data;
+            return new Filter.FilteredData(data.key, data.value);
         }
         return filter.filter(data);
     }
index 825673a..2c6b329 100644 (file)
@@ -55,9 +55,12 @@ public class Jobs {
     private MultiMap<Job> jobsByType = new MultiMap<>();
     private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
+    private final ApplicationConfig appConfig;
 
-    public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) {
+    public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext,
+            @Autowired ApplicationConfig appConfig) {
         restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
+        this.appConfig = appConfig;
     }
 
     public synchronized Job getJob(String id) throws ServiceException {
@@ -81,7 +84,7 @@ public class Jobs {
         AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
                 ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
                 : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
-        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
         this.put(job);
         synchronized (observers) {
             this.observers.forEach(obs -> obs.onJobbAdded(job));
index 3aa97fe..4f20c35 100644 (file)
@@ -30,11 +30,8 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
 
 /**
  * The class fetches incoming requests from DMAAP and sends them further to the
@@ -47,9 +44,8 @@ public class DmaapTopicListener implements TopicListener {
     private final AsyncRestClient dmaapRestClient;
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
-    private Many<Output> output;
-    private Disposable topicReceiverTask;
+    private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+    private Flux<DataFromTopic> dataFromDmaap;
 
     public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
         AsyncRestClientFactory restclientFactory =
@@ -60,42 +56,22 @@ public class DmaapTopicListener implements TopicListener {
     }
 
     @Override
-    public Many<Output> getOutput() {
-        return this.output;
-    }
-
-    @Override
-    public void start() {
-        stop();
-
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
-        topicReceiverTask = Flux.range(0, Integer.MAX_VALUE) //
-                .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .doOnNext(this::onReceivedData) //
-                .subscribe(//
-                        null, //
-                        throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                        this::onComplete); //
-    }
-
-    @Override
-    public void stop() {
-        if (topicReceiverTask != null) {
-            topicReceiverTask.dispose();
-            topicReceiverTask = null;
+    public Flux<DataFromTopic> getFlux() {
+        if (this.dataFromDmaap == null) {
+            this.dataFromDmaap = startFetchFromDmaap();
         }
+        return this.dataFromDmaap;
     }
 
-    private void onComplete() {
-        logger.warn("DmaapMessageConsumer completed {}", type.getId());
-        start();
-    }
-
-    private void onReceivedData(String input) {
-        logger.debug("Received from DMAAP topic: {} :{}", this.type.getDmaapTopicUrl(), input);
-        output.emitNext(new Output("", input), Sinks.EmitFailureHandler.FAIL_FAST);
+    private Flux<DataFromTopic> startFetchFromDmaap() {
+        return Flux.range(0, Integer.MAX_VALUE) //
+                .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
+                .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
+                .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
+                .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
+                .publish() //
+                .autoConnect() //
+                .map(input -> new DataFromTopic("", input)); //
     }
 
     private String getDmaapUrl() {
@@ -113,7 +89,7 @@ public class DmaapTopicListener implements TopicListener {
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
                 .flatMapMany(this::splitJsonArray) //
-                .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+                .doOnNext(message -> logger.debug("Message from DMaaP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
@@ -20,6 +20,7 @@
 
 package org.oran.dmaapadapter.tasks;
 
+import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,15 +32,15 @@ import reactor.core.publisher.Mono;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class HttpDataConsumer extends DataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class);
+public class HttpJobDataDistributor extends JobDataDistributor {
+    private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpDataConsumer(Job job) {
+    public HttpJobDataDistributor(Job job) {
         super(job);
     }
 
     @Override
-    protected Mono<String> sendToClient(TopicListener.Output output) {
+    protected Mono<String> sendToClient(Filter.FilteredData output) {
         Job job = this.getJob();
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
         MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
@@ -22,6 +22,7 @@ package org.oran.dmaapadapter.tasks;
 
 import lombok.Getter;
 
+import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,8 +37,8 @@ import reactor.core.publisher.Mono;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public abstract class DataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class);
+public abstract class JobDataDistributor {
+    private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
     @Getter
     private final Job job;
     private Disposable subscription;
@@ -69,17 +70,17 @@ public abstract class DataConsumer {
         }
     }
 
-    protected DataConsumer(Job job) {
+    protected JobDataDistributor(Job job) {
         this.job = job;
     }
 
-    public synchronized void start(Flux<TopicListener.Output> input) {
+    public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         stop();
         this.errorStats.resetIrrecoverableErrors();
-        this.subscription = handleReceivedMessage(input, job) //
+        this.subscription = filterAndBuffer(input, this.job) //
                 .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
-                .subscribe(this::handleConsumerSentOk, //
+                .subscribe(this::handleSentOk, //
                         this::handleExceptionInStream, //
                         () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
     }
@@ -89,7 +90,7 @@ public abstract class DataConsumer {
         stop();
     }
 
-    protected abstract Mono<String> sendToClient(TopicListener.Output output);
+    protected abstract Mono<String> sendToClient(Filter.FilteredData output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
@@ -102,19 +103,21 @@ public abstract class DataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
-        Flux<TopicListener.Output> result =
-                inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
-                        .filter(t -> !t.value.isEmpty()); //
+    private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+        Flux<Filter.FilteredData> filtered = //
+                inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+                        .map(job::filter) //
+                        .filter(f -> !f.isEmpty()) //
+                        .doOnNext(f -> job.getStatistics().filtered(f.value)); //
 
         if (job.isBuffered()) {
-            result = result.map(input -> quoteNonJson(input.value, job)) //
+            filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(buffered -> new TopicListener.Output("", buffered.toString()));
+                    .map(buffered -> new Filter.FilteredData("", buffered.toString()));
         }
-        return result;
+        return filtered;
     }
 
     private String quoteNonJson(String str, Job job) {
@@ -136,7 +139,7 @@ public abstract class DataConsumer {
         }
     }
 
-    private void handleConsumerSentOk(String data) {
+    private void handleSentOk(String data) {
         this.errorStats.handleOkFromConsumer();
     }
 
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,19 +43,19 @@ import reactor.kafka.sender.SenderRecord;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaDataConsumer extends DataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class);
+public class KafkaJobDataDistributor extends JobDataDistributor {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
 
     private KafkaSender<String, String> sender;
     private final ApplicationConfig appConfig;
 
-    public KafkaDataConsumer(Job job, ApplicationConfig appConfig) {
+    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
         super(job);
         this.appConfig = appConfig;
     }
 
     @Override
-    protected Mono<String> sendToClient(TopicListener.Output data) {
+    protected Mono<String> sendToClient(Filter.FilteredData data) {
         Job job = this.getJob();
 
         logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
@@ -67,7 +68,7 @@ public class KafkaDataConsumer extends DataConsumer {
     }
 
     @Override
-    public synchronized void start(Flux<TopicListener.Output> input) {
+    public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         super.start(input);
         SenderOptions<String, String> senderOptions = senderOptions(appConfig);
         this.sender = KafkaSender.create(senderOptions);
@@ -87,14 +88,13 @@ public class KafkaDataConsumer extends DataConsumer {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+    private SenderRecord<String, String, Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
         int correlationMetadata = 2;
         String topic = infoJob.getParameters().getKafkaOutputTopic();
         return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
index 4a7f269..61b50c6 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.Disposable;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Flux;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
 
@@ -47,8 +54,18 @@ public class KafkaTopicListener implements TopicListener {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private Many<Output> output;
-    private Disposable topicReceiverTask;
+    private Flux<DataFromTopic> dataFromTopic;
+
+    private static Gson gson = new GsonBuilder() //
+            .disableHtmlEscaping() //
+            .create(); //
+
+    @ToString
+    @Builder
+    public static class NewFileEvent {
+        @Getter
+        private String filename;
+    }
 
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
@@ -56,50 +73,59 @@ public class KafkaTopicListener implements TopicListener {
     }
 
     @Override
-    public Many<Output> getOutput() {
-        return this.output;
+    public Flux<DataFromTopic> getFlux() {
+        if (this.dataFromTopic == null) {
+            this.dataFromTopic = startReceiveFromTopic(this.type.getKafkaClientId(this.applicationConfig));
+        }
+        return this.dataFromTopic;
     }
 
-    @Override
-    public void start() {
-        stop();
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+    private Flux<DataFromTopic> startReceiveFromTopic(String clientId) {
         logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
-        topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
-                .receive() //
-                .doOnNext(this::onReceivedData) //
-                .subscribe(null, //
-                        this::onReceivedError, //
-                        () -> logger.warn("KafkaTopicReceiver stopped"));
-    }
 
-    @Override
-    public void stop() {
-        if (topicReceiverTask != null) {
-            topicReceiverTask.dispose();
-            topicReceiverTask = null;
-        }
+        return KafkaReceiver.create(kafkaInputProperties(clientId)) //
+                .receiveAutoAck() //
+                .concatMap(consumerRecord -> consumerRecord) //
+                .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
+                        input.value())) //
+                .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
+                .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+                .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+                .map(input -> new DataFromTopic(input.key(), input.value())) //
+                .map(this::getDataFromFileIfNewPmFileEvent) //
+                .publish() //
+                .autoConnect(1);
     }
 
-    private void onReceivedData(ConsumerRecord<String, String> input) {
-        logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
-        output.emitNext(new Output(input.key(), input.value()), Sinks.EmitFailureHandler.FAIL_FAST);
-    }
+    private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
 
-    private void onReceivedError(Throwable t) {
-        logger.error("KafkaTopicReceiver error: {}", t.getMessage());
+        if (!applicationConfig.getPmFilesPath().isEmpty() //
+                && this.type.getDataType() == InfoType.DataType.PM_DATA //
+                && data.value.length() < 1000) {
+            try {
+                NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+                Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename());
+                String pmReportJson = Files.readString(path, Charset.defaultCharset());
+                return new DataFromTopic(data.key, pmReportJson);
+            } catch (Exception e) {
+                return data;
+            }
+        } else {
+            return data;
+        }
     }
 
-    private ReceiverOptions<String, String> kafkaInputProperties() {
+    private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
         Map<String, Object> consumerProps = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter");
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 
         return ReceiverOptions.<String, String>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
index 304eb18..e0f897e 100644 (file)
@@ -64,9 +64,8 @@ public class ProducerRegstrationTask {
     private final AsyncRestClient restClient;
     private final ApplicationConfig applicationConfig;
     private final InfoTypes types;
-    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
-    private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
     @Getter
     private boolean isRegisteredInIcs = false;
     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10;
@@ -102,10 +101,14 @@ public class ProducerRegstrationTask {
         logger.warn("Registration of producer failed {}", t.getMessage());
     }
 
+    private String producerRegistrationUrl() {
+        final String producerId = this.applicationConfig.getSelfUrl().replace("/", "_");
+        return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
+    }
+
     // Returns TRUE if registration is correct
     private Mono<Boolean> checkRegistration() {
-        final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
-        return restClient.get(url) //
+        return restClient.get(producerRegistrationUrl()) //
                 .flatMap(this::isRegisterredInfoCorrect) //
                 .onErrorResume(t -> Mono.just(Boolean.FALSE));
     }
@@ -126,8 +129,6 @@ public class ProducerRegstrationTask {
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
-        final String producerUrl =
-                applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
@@ -135,7 +136,7 @@ public class ProducerRegstrationTask {
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
-                .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo())));
+                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
     }
 
     private Object typeSpecifcInfoObject() {
index e32cfa5..e26915b 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.ToString;
-import reactor.core.publisher.Sinks.Many;
+
+import org.oran.dmaapadapter.filter.PmReport;
+import reactor.core.publisher.Flux;
 
 public interface TopicListener {
 
     @ToString
-    public static class Output {
+    public static class DataFromTopic {
         public final String key;
         public final String value;
 
-        public Output(String key, String value) {
+        @Getter
+        @Setter
+        private PmReport cachedPmReport;
+
+        public DataFromTopic(String key, String value) {
             this.key = key;
             this.value = value;
         }
     }
 
-    public void start();
-
-    public void stop();
-
-    public Many<Output> getOutput();
+    public Flux<DataFromTopic> getFlux();
 }
index df70b9f..fcc94ee 100644 (file)
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@@ -50,12 +49,10 @@ public class TopicListeners {
     private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
-    private final MultiMap<DataConsumer> dataConsumers = new MultiMap<>(); // Key is typeId, jobId
+    private final MultiMap<JobDataDistributor> dataDistributors = new MultiMap<>(); // Key is typeId, jobId
 
     private final ApplicationConfig appConfig;
 
-    private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
-
     public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
             @Autowired SecurityContext securityContext) {
         this.appConfig = appConfig;
@@ -88,60 +85,37 @@ public class TopicListeners {
         removeJob(job);
         logger.debug("Job added {}", job.getId());
         if (job.getType().isKafkaTopicDefined()) {
-            addConsumer(job, dataConsumers, kafkaTopicListeners);
+            addConsumer(job, dataDistributors, kafkaTopicListeners);
         }
 
         if (job.getType().isDmaapTopicDefined()) {
-            addConsumer(job, dataConsumers, dmaapTopicListeners);
+            addConsumer(job, dataDistributors, dmaapTopicListeners);
         }
     }
 
-    private DataConsumer createConsumer(Job job) {
-        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig)
-                : new HttpDataConsumer(job);
+    private JobDataDistributor createConsumer(Job job) {
+        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
+                : new HttpJobDataDistributor(job);
     }
 
-    private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
+    private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+            Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
-        if (consumers.get(job.getType().getId()).isEmpty()) {
-            topicListener.start();
-        }
-        DataConsumer consumer = createConsumer(job);
-        consumer.start(topicListener.getOutput().asFlux());
-        consumers.put(job.getType().getId(), job.getId(), consumer);
+        JobDataDistributor distributor = createConsumer(job);
+        distributor.start(topicListener.getFlux());
+        distributors.put(job.getType().getId(), job.getId(), distributor);
     }
 
     public synchronized void removeJob(Job job) {
-        removeJob(job, dataConsumers);
+        removeJob(job, dataDistributors);
     }
 
-    private static void removeJob(Job job, MultiMap<DataConsumer> consumers) {
-        DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
-        if (consumer != null) {
+    private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
+        JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+        if (distributor != null) {
             logger.debug("Job removed {}", job.getId());
-            consumer.stop();
-        }
-    }
-
-    @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
-    public synchronized void restartNonRunningKafkaTopics() {
-        for (DataConsumer consumer : this.dataConsumers.values()) {
-            if (!consumer.isRunning()) {
-                restartTopicAndConsumers(this.kafkaTopicListeners, this.dataConsumers, consumer);
-            }
+            distributor.stop();
         }
-
     }
 
-    private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
-            MultiMap<DataConsumer> consumers, DataConsumer consumer) {
-        InfoType type = consumer.getJob().getType();
-        TopicListener topic = topicListeners.get(type.getId());
-        topic.start();
-        restartConsumersOfType(consumers, topic, type);
-    }
-
-    private static void restartConsumersOfType(MultiMap<DataConsumer> consumers, TopicListener topic, InfoType type) {
-        consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
-    }
 }
index 10c7662..5e774ce 100644 (file)
                         }
                      ]
                   },
+                  "measObjClass": {
+                     "type": "array",
+                     "items": [
+                        {
+                           "type": "string"
+                        }
+                     ]
+                  },
                   "measTypes": {
                      "type": "array",
                      "items": [
index c4b5ece..2f1f6c9 100644 (file)
@@ -47,14 +47,15 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.PmReport;
+import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.repository.filters.PmReport;
-import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.DataConsumer;
+import org.oran.dmaapadapter.tasks.JobDataDistributor;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
@@ -157,15 +158,23 @@ class ApplicationTest {
     }
 
     @BeforeEach
-    void setPort() {
+    void init() {
         this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
+        assertThat(this.jobs.size()).isZero();
+        assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
+        assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
     }
 
     @AfterEach
     void reset() {
+        for (Job job : this.jobs.getAll()) {
+            this.icsSimulatorController.deleteJob(job.getId(), restClient());
+        }
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
-        this.jobs.clear();
+
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -258,6 +267,14 @@ class ApplicationTest {
         }
     }
 
+    @Test
+    void testTrustValidation() throws IOException {
+
+        String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
+        ResponseEntity<String> resp = restClient(true).getForEntity(url).block();
+        assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+    }
+
     @Test
     void testResponseCodes() throws Exception {
         String supervisionUrl = baseUrl() + ProducerCallbacksController.SUPERVISION_URL;
@@ -288,24 +305,16 @@ class ApplicationTest {
         this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID);
+        JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID);
 
         // Handle received data from Kafka, check that it has been posted to the
         // consumer
-        kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
+        kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
-
-        // Test send an exception
-        kafkaConsumer.start(Flux.error(new NullPointerException()));
-
-        // Test regular restart of stopped
-        kafkaConsumer.stop();
-        this.topicListeners.restartNonRunningKafkaTopics();
-        await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
     }
 
     @Test
@@ -323,19 +332,15 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
 
         String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
         String jobs = restClient().get(jobUrl).block();
         assertThat(jobs).contains(JOB_ID);
-
-        // Delete the job
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     @Test
@@ -353,16 +358,22 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse11\", \"DmaapResponse22\"]");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
-        assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11");
+        assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         // Delete the job
         this.icsSimulatorController.deleteJob(JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
+        // Test that deleting the the last job did not terminate the DmaapTopicListener
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        DmaapSimulatorController.addResponse("[\"DmaapResponse77\", \"DmaapResponse88\"]");
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4));
     }
 
     static class PmReportArray extends ArrayList<PmReport> {
@@ -395,6 +406,8 @@ class ApplicationTest {
         // filtered PM message
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+        DmaapSimulatorController.addPmResponse("{}"); // This should just be discarded
+
         DmaapSimulatorController.addPmResponse(pmReportJson);
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
@@ -405,7 +418,6 @@ class ApplicationTest {
 
         PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
         assertThat(reportsParsed).hasSize(1);
-
     }
 
     @Test
@@ -462,18 +474,21 @@ class ApplicationTest {
         DmaapSimulatorController.addResponse("[\"Hello\"]");
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
-        await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         String received = consumer.receivedBodies.get(0);
         assertThat(received).isEqualTo("Hello");
-        // This is the only time it is verified that mime type is plaintext when isJson
-        // is false and buffering is not used
-        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         // Check that the auth token was received by the consumer
         assertThat(consumer.receivedHeaders).hasSize(1);
         Map<String, String> headers = consumer.receivedHeaders.get(0);
         assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
+
+        // This is the only time it is verified that mime type is plaintext when isJson
+        // is false and buffering is not used
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
+
         Files.delete(authFile);
+        this.securityContext.setAuthTokenFilePath(null);
     }
 
     @Test
@@ -540,6 +555,23 @@ class ApplicationTest {
                 .hasSize(this.types.size()));
     }
 
+    @Test
+    void testStatistics() throws ServiceException {
+        // Register producer, Register types
+        waitForRegistration();
+        final String JOB_ID = "testStatistics";
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp());
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+        String stats = restClient().get(targetUri).block();
+
+        assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+
+    }
+
     public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
         testErrorCode(request, expStatus, responseContains, true);
     }
index 6d29c29..ab9e15c 100644 (file)
@@ -56,7 +56,7 @@ import org.springframework.web.bind.annotation.RestController;
 public class IcsSimulatorController {
 
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-    private final static Gson gson = new GsonBuilder().create();
+    private final static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
 
     public static class TestResults {
 
index 287c20b..c34d0f3 100644 (file)
@@ -33,7 +33,6 @@ import java.nio.file.Path;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
@@ -55,10 +54,8 @@ import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.http.HttpStatus;
 import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
 
 @SuppressWarnings("java:S3577") // Rename class
-@ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
@@ -87,7 +84,7 @@ class IntegrationWithIcs {
     @Autowired
     private SecurityContext securityContext;
 
-    private static Gson gson = new GsonBuilder().create();
+    private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
 
     static class TestApplicationConfig extends ApplicationConfig {
 
index 330eb6b..45bee5d 100644 (file)
@@ -26,15 +26,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.gson.JsonParser;
 
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
@@ -42,13 +47,14 @@ import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DataConsumer;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
@@ -64,7 +70,6 @@ import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
 
-import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.kafka.sender.KafkaSender;
 import reactor.kafka.sender.SenderOptions;
@@ -75,11 +80,12 @@ import reactor.kafka.sender.SenderRecord;
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
-        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
-})
+        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
+        "app.pm-files-path=./src/test/resources/"}) //
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
+    final String PM_TYPE_ID = "PmInformationTypeKafka";
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -102,7 +108,7 @@ class IntegrationWithKafka {
     @Autowired
     private SecurityContext securityContext;
 
-    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
     private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
 
@@ -151,11 +157,72 @@ class IntegrationWithKafka {
         }
     }
 
+    private static class KafkaReceiver {
+        public final String OUTPUT_TOPIC;
+        private TopicListener.DataFromTopic receivedKafkaOutput;
+        private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+        int count = 0;
+
+        public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) {
+            this.OUTPUT_TOPIC = outputTopic;
+
+            // Create a listener to the output topic. The KafkaTopicListener happens to be
+            // suitable for that,
+            InfoType type =
+                    InfoType.builder().id("TestReceiver").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
+
+            KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+
+            topicListener.getFlux() //
+                    .doOnNext(this::set) //
+                    .doFinally(sig -> logger.info("Finally " + sig)) //
+                    .subscribe();
+        }
+
+        private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
+            this.receivedKafkaOutput = receivedKafkaOutput;
+            this.count++;
+            logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+        }
+
+        synchronized String lastKey() {
+            return this.receivedKafkaOutput.key;
+        }
+
+        synchronized String lastValue() {
+            return this.receivedKafkaOutput.value;
+        }
+
+        void reset() {
+            count = 0;
+            this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
+        }
+    }
+
+    private static KafkaReceiver kafkaReceiver;
+    private static KafkaReceiver kafkaReceiver2;
+
+    @BeforeEach
+    void init() {
+        if (kafkaReceiver == null) {
+            kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic");
+            kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2");
+        }
+        kafkaReceiver.reset();
+        kafkaReceiver2.reset();
+    }
+
     @AfterEach
     void reset() {
+        for (Job job : this.jobs.getAll()) {
+            this.icsSimulatorController.deleteJob(job.getId(), restClient());
+        }
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
+
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
-        this.jobs.clear();
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -213,6 +280,18 @@ class IntegrationWithKafka {
         }
     }
 
+    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+        try {
+            Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+            String str = gson.toJson(param);
+            Object parametersObj = jsonObject(str);
+
+            return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", null, "");
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     ConsumerJobInfo consumerJobInfoKafka(String topic) {
         try {
             Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
@@ -230,19 +309,15 @@ class IntegrationWithKafka {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
+        // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> senderRecord(String data) {
-        return senderRecord(data, "");
-    }
-
-    private SenderRecord<String, String, Integer> senderRecord(String data, String key) {
-        final InfoType infoType = this.types.get(TYPE_ID);
+    private SenderRecord<String, String, Integer> senderRecord(String data, String key, String typeId) {
+        final InfoType infoType = this.types.get(typeId);
         int correlationMetadata = 2;
         return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
     }
@@ -265,9 +340,19 @@ class IntegrationWithKafka {
         }
     }
 
+    private void verifiedReceivedByConsumerLast(String s) {
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+
+        await().untilAsserted(() -> assertThat(last(consumer.receivedBodies)).isEqualTo(s));
+    }
+
+    private String last(List<String> l) {
+        return l.isEmpty() ? "" : l.get(l.size() - 1);
+    }
+
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
-    private static void sleep(long millis) throws InterruptedException {
-        Thread.sleep(millis);
+    private static void waitForKafkaListener() throws InterruptedException {
+        Thread.sleep(4000);
     }
 
     @Test
@@ -280,20 +365,37 @@ class IntegrationWithKafka {
 
         this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        waitForKafkaListener();
 
-        sleep(4000);
-        var dataToSend = Flux.just(senderRecord("Message", ""));
+        var dataToSend = Flux.just(senderRecord("Message", "", TYPE_ID));
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message");
+    }
+
+    @Test
+    void kafkaIntegrationTest() throws Exception {
+        final String JOB_ID1 = "ID1";
+        final String JOB_ID2 = "ID2";
 
-        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
-    }
+        // Create two jobs. One buffering and one with a filter
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+                restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+        waitForKafkaListener();
 
-    TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+                                                                                               // Message_2
+                                                                                               // etc.
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+    }
 
     @Test
     void sendToKafkaConsumer() throws ServiceException, InterruptedException {
@@ -303,70 +405,111 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        final String OUTPUT_TOPIC = "outputTopic";
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        waitForKafkaListener();
+
+        String sendString = "testData " + Instant.now();
+        String sendKey = "key " + Instant.now();
+        var dataToSend = Flux.just(senderRecord(sendString, sendKey, TYPE_ID));
+        sendDataToStream(dataToSend);
+
+        await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
+        assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
+
+        printStatistics();
+    }
+
+    private void printStatistics() {
+        String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+        String stats = restClient().get(targetUri).block();
+        logger.info("Stats : {}", stats);
+    }
+
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    @Test
+    void kafkaCharacteristics() throws Exception {
+        final String JOB_ID = "kafkaCharacteristics";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        waitForKafkaListener();
 
-        // Create a listener to the output topic. The KafkaTopicListener happens to be
-        // suitable for that,
-        InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
-        KafkaTopicListener receiver = new KafkaTopicListener(this.applicationConfig, type);
-        receiver.start();
+        final int NO_OF_OBJECTS = 100000;
 
-        Disposable disponsable = receiver.getOutput().asFlux() //
-                .doOnNext(output -> {
-                    receivedKafkaOutput = output;
-                    logger.info("*** recived {}, {}", OUTPUT_TOPIC, output);
-                }) //
-                .doFinally(sig -> logger.info("Finally " + sig)) //
-                .subscribe();
+        Instant startTime = Instant.now();
 
-        String sendString = "testData " + Instant.now();
-        String sendKey = "key " + Instant.now();
-        var dataToSend = Flux.just(senderRecord(sendString, sendKey));
-        sleep(4000);
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+                                                                                                           // etc.
         sendDataToStream(dataToSend);
 
-        await().untilAsserted(() -> assertThat(this.receivedKafkaOutput.value).isEqualTo(sendString));
-        assertThat(this.receivedKafkaOutput.key).isEqualTo(sendKey);
+        while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) {
+            logger.info("sleeping {}", kafkaReceiver.lastValue());
+            Thread.sleep(1000 * 1);
+        }
 
-        disponsable.dispose();
-        receiver.stop();
+        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
     }
 
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
-    void kafkaIntegrationTest() throws Exception {
-        final String JOB_ID1 = "ID1";
-        final String JOB_ID2 = "ID2";
+    void kafkaCharacteristics_pmFilter() throws Exception {
+        // Filter PM reports and sent to two jobs over Kafka
+
+        final String JOB_ID = "kafkaCharacteristics";
+        final String JOB_ID2 = "kafkaCharacteristics2";
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.getMeasObjClass().add("UtranCell");
+
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
+                restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
                 restClient());
-        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+        waitForKafkaListener();
+
+        final int NO_OF_OBJECTS = 100000;
 
-        sleep(2000);
-        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+        Instant startTime = Instant.now();
+
+        KafkaTopicListener.NewFileEvent event =
+                KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build();
+        String eventAsString = gson.toJson(event);
+
+        String path = "./src/test/resources/pm_report.json";
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+
+        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(eventAsString, "key", PM_TYPE_ID));
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+        while (kafkaReceiver.count != NO_OF_OBJECTS) {
+            logger.info("sleeping {}", kafkaReceiver.count);
+            Thread.sleep(1000 * 1);
+        }
 
-        // Delete the jobs
-        this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
-        this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
+        // System.out.println(kafkaReceiver.receivedKafkaOutput.value);
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+        logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
+
+        printStatistics();
     }
 
     @Test
-    void kafkaIOverflow() throws Exception {
+    void kafkaDeleteJobShouldNotStopListener() throws Exception {
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
@@ -381,28 +524,22 @@ class IntegrationWithKafka {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
-        var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
-        sendDataToStream(dataToSend); // this should overflow
-
-        DataConsumer consumer = topicListeners.getDataConsumers().get(TYPE_ID).iterator().next();
-        await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
-        this.consumerController.testResults.reset();
-
-        this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
-        topicListeners.restartNonRunningKafkaTopics();
-        sleep(1000); // Restarting the input seems to take some asynch time
-
-        dataToSend = Flux.just(senderRecord("Howdy\""));
-        sendDataToStream(dataToSend);
-
-        verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
+        var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+                                                                                                 // Message_2
+                                                                                                 // etc.
+        sendDataToStream(dataToSend); // this should not overflow
 
-        // Delete the jobs
+        // Delete jobs, recreate one
         this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
-
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        dataToSend = Flux.just(senderRecord("Howdy", "", TYPE_ID));
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumerLast("Howdy");
     }
 
 }
@@ -18,7 +18,7 @@
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -27,16 +27,21 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 class JsltFilterTest {
 
+    private String filterReport(JsltFilter filter) throws Exception {
+        return filter.filter(new DataFromTopic("", loadReport())).value;
+    }
+
     @Test
     void testPickOneValue() throws Exception {
         String expresssion = "if(.event.commonEventHeader.sourceName == 'O-DU-1122')" //
                 + ".event.perf3gppFields.measDataCollection.measInfoList[0].measValuesList[0].measResults[0].sValue";
 
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         assertThat(res).isEqualTo(reQuote("'813'"));
     }
 
@@ -46,7 +51,7 @@ class JsltFilterTest {
                 + ".";
 
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         assertThat(res).contains("event");
     }
 
@@ -55,7 +60,7 @@ class JsltFilterTest {
         String expresssion = "if(.event.commonEventHeader.sourceName == 'JUNK')" //
                 + ".";
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         assertThat(res).isEmpty();
     }
 
@@ -71,7 +76,7 @@ class JsltFilterTest {
                         "}"; //
 
         JsltFilter filter = new JsltFilter(reQuote(expresssion));
-        String res = filter.filter(loadReport());
+        String res = filterReport(filter);
         String expected =
                 "{'array':['RncFunction=RF-1,UtranCell=Gbg-997','RncFunction=RF-1,UtranCell=Gbg-998','RncFunction=RF-1,UtranCell=Gbg-999'],'size':3}";
 
@@ -18,7 +18,7 @@
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 class JsonPathFilterTest {
 
@@ -34,7 +35,7 @@ class JsonPathFilterTest {
     void testJsonPath() throws Exception {
         String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
         JsonPathFilter filter = new JsonPathFilter(exp);
-        String res = filter.filter(loadReport());
+        String res = filter.filter(new DataFromTopic("", loadReport())).value;
         assertThat(res).isEqualTo("\"attTCHSeizures\"");
     }
 
@@ -18,7 +18,7 @@
  * ========================LICENSE_END===================================
  */
 
-package org.oran.dmaapadapter.repository.filters;
+package org.oran.dmaapadapter.filter;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -27,19 +27,22 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.tasks.TopicListener;
 
 class PmReportFilterTest {
 
+    private String filterReport(PmReportFilter filter) throws Exception {
+        return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value;
+    }
+
     @Test
     void testPmFilterMeasTypes() throws Exception {
 
-        String reportJson = loadReport();
-
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measTypes.add("succImmediateAssignProcs");
 
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(reportJson);
+        String filtered = filterReport(filter);
 
         assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1")
                 .contains("Gbg-997");
@@ -48,7 +51,7 @@ class PmReportFilterTest {
         filterData = new PmReportFilter.FilterData();
         filterData.measTypes.add("junk");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(reportJson);
+        filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
     }
 
@@ -57,28 +60,43 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measObjInstIds.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.measObjInstIds.add("UtranCell=Gbg-997");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
+        filtered = filterReport(filter);
         assertThat(filtered).contains("Gbg-997").doesNotContain("Gbg-998");
     }
 
+    @Test
+    void testMeasObjClass() throws Exception {
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        filterData.measObjClass.add("junk");
+        PmReportFilter filter = new PmReportFilter(filterData);
+        String filtered = filterReport(filter);
+        assertThat(filtered).isEmpty();
+
+        filterData = new PmReportFilter.FilterData();
+        filterData.measObjClass.add("ENodeBFunction");
+        filter = new PmReportFilter(filterData);
+        filtered = filterReport(filter);
+        assertThat(filtered).contains("ENodeBFunction").doesNotContain("UtranCell");
+    }
+
     @Test
     void testSourceNames() throws Exception {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.sourceNames.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.sourceNames.add("O-DU-1122");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
+        filtered = filterReport(filter);
         assertThat(filtered).contains("O-DU-1122");
     }
 
@@ -87,19 +105,36 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.measuredEntityDns.add("junk");
         PmReportFilter filter = new PmReportFilter(filterData);
-        String filtered = filter.filter(loadReport());
+        String filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
 
         filterData = new PmReportFilter.FilterData();
         filterData.measuredEntityDns.add("ManagedElement=RNC-Gbg-1");
         filter = new PmReportFilter(filterData);
-        filtered = filter.filter(loadReport());
-        assertThat(filtered).contains("RNC-Gbg-1"); // '=' is escaped to unicode by gson. OK
+        filtered = filterReport(filter);
+        assertThat(filtered).contains("ManagedElement=RNC-Gbg-1");
+    }
+
+    @Test
+    void testCrapInput() {
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+        PmReportFilter filter = new PmReportFilter(filterData);
+
+        String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value;
+        assertThat(filtered).isEmpty();
+
+        filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value;
+        assertThat(filtered).isEmpty();
+
+    }
+
+    private String reQuote(String str) {
+        return str.replaceAll("'", "\\\"");
     }
 
     @Test
     void testParse() throws Exception {
-        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
         PmReport report = gson.fromJson(loadReport(), PmReport.class);
 
         String dn = report.event.perf3gppFields.measDataCollection.measuredEntityDn;
index df1ccd5..2863590 100644 (file)
@@ -8,7 +8,7 @@
       },
       {
          "id": "KafkaInformationType",
-         "kafkaInputTopic": "TutorialTopic",
+         "kafkaInputTopic": "KafkaInput",
          "useHttpProxy": false
       },
       {
@@ -20,7 +20,7 @@
       },
       {
          "id": "PmInformationTypeKafka",
-         "kafkaInputTopic": "TutorialTopic",
+         "kafkaInputTopic": "PmFileData",
          "useHttpProxy": false,
          "dataType": "PmData",
          "isJson": true