Updates for G Maintenance release 35/10435/1 1.2.1
authorychacon <yennifer.chacon@est.tech>
Fri, 10 Feb 2023 12:46:33 +0000 (13:46 +0100)
committerychacon <yennifer.chacon@est.tech>
Fri, 10 Feb 2023 12:50:12 +0000 (13:50 +0100)
    Minor changes.
    Step parent pom version
    DMaaP adapter, updated job definition syntax
    DMaaP adapter, support for shared Kafka topics

Issue-ID: NONRTRIC-838
Signed-off-by: ychacon <yennifer.chacon@est.tech>
Change-Id: Ia6a909a07304c79a0c2d53cdea0f857ebae7f96a

39 files changed:
README.md
api/api.json
api/api.yaml
config/application.yaml
docs/Pictures.pptx
docs/dedicatedTopics.png [new file with mode: 0644]
docs/overview.rst
docs/sharedTopics.png [new file with mode: 0644]
pom.xml
src/main/java/org/oran/dmaapadapter/Application.java
src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
src/main/java/org/oran/dmaapadapter/filter/Filter.java
src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java
src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java
src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java
src/main/java/org/oran/dmaapadapter/filter/PmReport.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchema.json
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/ConsumerController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java
src/test/resources/pm_report.json

index 686135a..48f2f20 100644 (file)
--- a/README.md
+++ b/README.md
@@ -55,14 +55,14 @@ When an Information Job is created in the Information Coordinator Service Consum
         "maxSize": {
           "type": "integer"
         },
-        "maxTimeMiliseconds": {
+        "maxTimeMilliseconds": {
           "type": "integer"
         }
       },
       "additionalProperties": false,
       "required": [
         "maxSize",
-        "maxTimeMiliseconds"
+        "maxTimeMilliseconds"
       ]
     }
   },
@@ -79,7 +79,7 @@ When an Information Job is created in the Information Coordinator Service Consum
    Object1 and Object2 may be posted in one call -->  ["Object1", "Object2"]
  The bufferTimeout is a Json object and the parameters in the object are:
    - maxSize the maximum number of buffered objects before posting
-   - maxTimeMiliseconds the maximum delay time to buffer before posting
+   - maxTimeMilliseconds the maximum delay time to buffer before posting
  If no bufferTimeout is specified, each object will be posted as received in separate calls (not quoted and put in a Json array).
 
 
index 3ec5a4d..6ba7582 100644 (file)
             }},
             "tags": ["Actuator"]
         }},
+        "/actuator/shutdown": {"post": {
+            "summary": "Actuator web endpoint 'shutdown'",
+            "operationId": "shutdown_2",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
         "/data-producer/v1/info-producers/{infoProducerId}": {
             "get": {
                 "operationId": "getInfoProducer",
index b2eaf15..024b43f 100644 (file)
@@ -174,6 +174,19 @@ paths:
             '*/*':
               schema:
                 type: object
+  /actuator/shutdown:
+    post:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'shutdown'
+      operationId: shutdown_2
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
   /data-producer/v1/info-producers/{infoProducerId}:
     get:
       tags:
index 8975849..57c6e31 100644 (file)
@@ -27,7 +27,12 @@ management:
     web:
       exposure:
         # Enabling of springboot actuator features. See springboot documentation.
-        include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
+        include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown"
+  endpoint:
+    shutdown:
+      enabled: true
+lifecycle:
+  timeout-per-shutdown-phase: "20s"
 springdoc:
   show-actuator: true
 logging:
@@ -48,14 +53,15 @@ logging:
 server:
    # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
    # See springboot documentation.
-   port : 8435
-   http-port: 8084
-   ssl:
-      key-store-type: JKS
-      key-store-password: policy_agent
-      key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
-      key-password: policy_agent
-      key-alias: policy_agent
+  port : 8435
+  http-port: 8084
+  ssl:
+    key-store-type: JKS
+    key-store-password: policy_agent
+    key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
+    key-password: policy_agent
+    key-alias: policy_agent
+  shutdown: "graceful"
 app:
   webclient:
     # Configuration of the trust store used for the HTTP client (outgoing requests)
index e2cf47e..2ec5a51 100644 (file)
Binary files a/docs/Pictures.pptx and b/docs/Pictures.pptx differ
diff --git a/docs/dedicatedTopics.png b/docs/dedicatedTopics.png
new file mode 100644 (file)
index 0000000..0b91b75
Binary files /dev/null and b/docs/dedicatedTopics.png differ
index 1174e44..88a2606 100644 (file)
@@ -116,7 +116,10 @@ typeSchema.json
 ===============
 This schema will by default be registerred for the type. The following properties are defined:
 
-* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka.
+* outputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. This consists of the following properties:
+
+  * topic, the name of the kafka topic
+  * bootStrapServers, reference to the kafka bus to used. This is optional, if this is omitted the default configured kafka bus is used (which is configured in the application.yaml file).
 
 * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
 
@@ -128,7 +131,7 @@ This schema will by default be registerred for the type. The following propertie
 * bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call. This contains:
 
   * maxSize, the maximum number of objects to collect before delivery to the consumer
-  * maxTimeMiliseconds, the maximum time to delay delivery (to buffer).
+  * maxTimeMilliseconds, the maximum time to delay delivery (to buffer).
 
 * maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher value may increase throughput.
 
@@ -165,7 +168,7 @@ Below follows an example of using bufferTimeout and maxConcurrency.
     {
        "bufferTimeout":{
           "maxSize":123,
-          "maxTimeMiliseconds":456
+          "maxTimeMilliseconds":456
        },
        "maxConcurrency":1
     }
@@ -179,18 +182,24 @@ This schema will be registered when the configured dataType is "pmData".
 This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to
 define which meas-types (counters) to get from which resources.
 
+"It is possible to both filter on new data that is collected from the traffical nodes and to query from data that is already collected.
+
 The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering.
 
 * sourceNames an array of source names for wanted PM reports.
 * measObjInstIds an array of meas object instances for wanted PM reports. If a given filter value is contained in the filter definition, it will match (partial matching).
   For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32".
-* measTypes selects the meas types to get
+* measTypeSpecs selects the meas types to get. This consist of:
+
+  * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
+  * measTypes the name of the measurement types (connected to the measObjClass).
 * measuredEntityDns partial match of meas entity DNs.
-* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
   Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
+* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned.
+  In this case, the query is only done for files from the given "sourceNames".
+  If this parameter is excluded, only "new" reports will be delivered as they are collected from the traffical nodes.
 
-All PM filter properties are optional and a non given will result in "match all".
-The result of the filtering is still following the structure of a 3GPP PM report.
+All PM filter properties are optional and a non given will result in "match all" (except the pmRopStartTime).
 
 Below follows an example of a PM filter.
 
@@ -205,11 +214,144 @@ Below follows an example of a PM filter.
         "measObjInstIds":[
            "UtranCell=dGbg-997"
         ],
-        "measTypes":[
-           "succImmediateAssignProcs"
-        ],
+        "measTypeSpecs":[
+             {
+                "measuredObjClass":"UtranCell",
+                "measTypes":[
+                   "succImmediateAssignProcs"
+                ]
+             }
+          ],
         "measuredEntityDns":[
            "ManagedElement=RNC-Gbg-1"
-        ]
+        ],
+        "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00"
+      }
+    }
+
+
+Here is an example of a filter that will
+match two counters from all cells in two traffical nodes.
+
+.. code-block:: javascript
+
+    {
+      "filterType":"pmdata",
+      "filter": {
+        "sourceNames":[
+           "O-DU-1122", "O-DU-1123"
+        ],
+        "measTypeSpecs":[
+             {
+                "measuredObjClass":"NRCellCU",
+                "measTypes":[
+                   "pmCounterNumber0", "pmCounterNumber1"
+                ]
+             }
+          ],
+
       }
     }
+
+
+********************
+Bulk PM subscription
+********************
+
+The sequence is that a "new file event" is received (from a Kafka topic).
+The file is read from local storage (file storage or S3 object store). For each Job, the specified PM filter is applied to the data
+and the result is sent to the Kafka topic specified by the Job (by the data consumer).
+
+.. image:: ./dedicatedTopics.png
+   :width: 500pt
+
+The result of the PM filtering is still following the structure of a 3GPP PM report.
+Here follows an example of a resulting delivered PM report.
+
+.. code-block:: javascript
+
+   {
+      "event":{
+         "commonEventHeader":{
+            "domain":"perf3gpp",
+            "eventId":"9efa1210-f285-455f-9c6a-3a659b1f1882",
+            "eventName":"perf3gpp_gnb-Ericsson_pmMeasResult",
+            "sourceName":"O-DU-1122",
+            "reportingEntityName":"",
+            "startEpochMicrosec":951912000000,
+            "lastEpochMicrosec":951912900000,
+            "timeZoneOffset":"+00:00"
+         },
+         "perf3gppFields":{
+            "perf3gppFieldsVersion":"1.0",
+            "measDataCollection":{
+               "granularityPeriod":900,
+               "measuredEntityUserName":"RNC Telecomville",
+               "measuredEntityDn":"SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
+               "measuredEntitySoftwareVersion":"",
+               "measInfoList":[
+                  {
+                     "measInfoId":{
+                        "sMeasInfoId":""
+                     },
+                     "measTypes":{
+                        "map":{
+                           "succImmediateAssignProcs":1
+                        },
+                        "sMeasTypesList":[
+                           "succImmediateAssignProcs"
+                        ]
+                     },
+                     "measValuesList":[
+                        {
+                           "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-997",
+                           "suspectFlag":"false",
+                           "measResults":[
+                              {
+                                 "p":1,
+                                 "sValue":"1113"
+                              }
+                           ]
+                        },
+                        {
+                           "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-998",
+                           "suspectFlag":"false",
+                           "measResults":[
+                              {
+                                 "p":1,
+                                 "sValue":"234"
+                              }
+                           ]
+                        },
+                        {
+                           "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-999",
+                           "suspectFlag":"true",
+                           "measResults":[
+                              {
+                                 "p":1,
+                                 "sValue":"789"
+                              }
+                           ]
+                        }
+                     ]
+                  }
+               ]
+            }
+         }
+      }
+   }
+
+If several jobs publish to the same Kafka topic (shared topic), the resulting filtered output will be an aggregate of all matching filters.
+So, each consumer will then get more data than requested.
+
+.. image:: ./sharedTopics.png
+   :width: 500pt
+
+==================
+Sent Kafka headers
+==================
+
+For each filtered result sent to a Kafka topic, there will the following proerties in the Kafa header:
+
+* type-id, this propery is used to indicate the ID of the information type. The value is a string.
+* gzip, if this property exists the object is gzipped (otherwise not). The property has no value.
\ No newline at end of file
diff --git a/docs/sharedTopics.png b/docs/sharedTopics.png
new file mode 100644 (file)
index 0000000..55668c4
Binary files /dev/null and b/docs/sharedTopics.png differ
diff --git a/pom.xml b/pom.xml
index 91319ff..981578b 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.5.14</version>
+        <version>2.7.8</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric.plt</groupId>
@@ -57,7 +57,7 @@
         <swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version>
         <docker-maven-plugin>0.30.0</docker-maven-plugin>
         <sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
-        <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
+        <jacoco-maven-plugin.version>0.8.8</jacoco-maven-plugin.version>
         <exec.skip>true</exec.skip>
         <protobuf.version>4.0.0-rc-2</protobuf.version>
         <protobuf-java-format.version>1.4</protobuf-java-format.version>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>s3</artifactId>
             <version>2.17.292</version>
-        </dependency>  
+        </dependency>
     </dependencies>
     <build>
         <plugins>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
+</project>
\ No newline at end of file
index 2058202..7201d4f 100644 (file)
@@ -48,6 +48,14 @@ public class Application {
 
     public static void main(String[] args) {
         applicationContext = SpringApplication.run(Application.class);
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                logger.warn("Shutting down, received signal SIGTERM");
+                SpringApplication.exit(applicationContext);
+                applicationContext = null;
+            }
+        });
     }
 
     @Scheduled(fixedRate = 10 * 1000)
index a6e2444..3ba96ab 100644 (file)
@@ -118,6 +118,22 @@ public class AsyncRestClient {
                 .map(this::toBody);
     }
 
+    public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+        Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .body(bodyProducer, String.class);
+        return retrieve(request);
+    }
+
+    public Mono<String> post(String uri, @Nullable String body) {
+        return postForEntity(uri, body) //
+                .map(this::toBody);
+    }
+
     private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
         if (securityContext.isConfigured()) {
             request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
index b53383a..d7561da 100644 (file)
@@ -162,7 +162,7 @@ public class ApplicationConfig {
     }
 
     public Collection<InfoType> getTypes() {
-        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
         try {
             String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
             ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
index 1816dc3..c118da3 100644 (file)
@@ -97,7 +97,7 @@ public class ProducerCallbacksController {
             logger.warn("jobCreatedCallback failed: {}", e.getMessage());
             return ErrorResponse.create(e, e.getHttpStatus());
         } catch (Exception e) {
-            logger.warn("jobCreatedCallback failed: {}", e.getMessage());
+            logger.warn("jobCreatedCallback failed: {}", e.getMessage(), e);
             return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
         }
     }
index 6c5fa25..e327fb6 100644 (file)
@@ -39,24 +39,26 @@ public interface Filter {
     public static class FilteredData {
         public final byte[] key;
         public final byte[] value;
+        public final String infoTypeId;
 
         @Getter
         private final boolean isZipped;
 
-        private static final FilteredData emptyData = new FilteredData(null, null);
+        private static final FilteredData emptyData = new FilteredData(null, null, null);
 
         public boolean isEmpty() {
             return (key == null || key.length == 0) && (value == null || value.length == 0);
         }
 
-        public FilteredData(byte[] key, byte[] value) {
-            this(key, value, false);
+        public FilteredData(String type, byte[] key, byte[] value) {
+            this(type, key, value, false);
         }
 
-        public FilteredData(byte[] key, byte[] value, boolean isZipped) {
+        public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
             this.key = key;
             this.value = value;
             this.isZipped = isZipped;
+            this.infoTypeId = type;
         }
 
         public String getValueAString() {
@@ -70,9 +72,9 @@ public interface Filter {
         public Iterable<Header> headers() {
             ArrayList<Header> result = new ArrayList<>();
             if (isZipped()) {
-                Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null);
-                result.add(h);
+                result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
             }
+            result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
             return result;
         }
     }
index 8d51727..358c093 100644 (file)
@@ -23,6 +23,7 @@ package org.oran.dmaapadapter.filter;
 import com.google.gson.GsonBuilder;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +52,14 @@ public class FilterFactory {
         }
     }
 
+    public static PmReportFilter createAggregateFilter(Collection<PmReportFilter> filters) {
+        PmReportFilter.FilterData resultFilterData = filters.iterator().next().getFilterData();
+        for (PmReportFilter filter : filters) {
+            resultFilterData.addAll(filter.getFilterData());
+        }
+        return new PmReportFilter(resultFilterData);
+    }
+
     private static PmReportFilter.FilterData createPmFilterData(Object filter) {
         String str = gson.toJson(filter);
         return gson.fromJson(str, PmReportFilter.FilterData.class);
index 577f83d..1720517 100644 (file)
@@ -49,7 +49,7 @@ class JsltFilter implements Filter {
     @Override
     public FilteredData filter(DataFromTopic data) {
         if (expression == null) {
-            return new FilteredData(data.key, data.value);
+            return new FilteredData(data.infoTypeId, data.key, data.value);
         }
         try {
             JsonFactory factory = mapper.getFactory();
@@ -60,7 +60,7 @@ class JsltFilter implements Filter {
             if (filteredNode == NullNode.instance) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode));
+            return new FilteredData(data.infoTypeId, data.key, mapper.writeValueAsBytes(filteredNode));
         } catch (Exception e) {
             return FilteredData.empty();
         }
index 98f1b46..8c397b0 100644 (file)
@@ -46,7 +46,7 @@ class JsonPathFilter implements Filter {
             String str = new String(data.value);
             Object o = JsonPath.parse(str).read(this.expression, Object.class);
             String json = gson.toJson(o);
-            return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes());
+            return o == null ? FilteredData.empty() : new FilteredData(data.infoTypeId, data.key, json.getBytes());
         } catch (Exception e) {
             return FilteredData.empty();
         }
index 46c90a1..3182d7b 100644 (file)
@@ -42,9 +42,6 @@ public class PmReport {
         @Expose
         private String eventId;
 
-        @Expose
-        private int sequence;
-
         @Expose
         private String eventName;
 
@@ -55,23 +52,20 @@ public class PmReport {
         @Expose
         private String reportingEntityName;
 
-        @Expose
-        private String priority;
-
         @Expose
         private long startEpochMicrosec;
 
         @Expose
         private long lastEpochMicrosec;
 
-        @Expose
-        private String version;
-
-        @Expose
-        private String vesEventListenerVersion;
-
         @Expose
         private String timeZoneOffset;
+
+        /* Not reported elements */
+        int sequence;
+        String priority;
+        String version;
+        String vesEventListenerVersion;
     }
 
     public static class MeasInfoId {
index a545e90..db1a018 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -61,17 +62,98 @@ public class PmReportFilter implements Filter {
 
     @Getter
     public static class FilterData {
-        final Collection<String> sourceNames = new HashSet<>();
-        final Collection<String> measObjInstIds = new ArrayList<>();
-        final Collection<String> measTypes = new HashSet<>();
-        final Collection<String> measuredEntityDns = new ArrayList<>();
-        final Collection<String> measObjClass = new HashSet<>();
+
+        public static class MeasTypeSpec {
+            static MeasTypeSpec empty = new MeasTypeSpec();
+
+            static MeasTypeSpec empty() {
+                return empty;
+            }
+
+            @Getter
+            String measuredObjClass;
+
+            @Getter
+            final Set<String> measTypes = new HashSet<>();
+
+            @Override
+            public boolean equals(Object obj) {
+                return measuredObjClass.equals(obj);
+            }
+
+            @Override
+            public int hashCode() {
+                return measuredObjClass.hashCode();
+            }
+        }
+
+        final Set<String> sourceNames = new HashSet<>();
+        final Set<String> measObjInstIds = new HashSet<>();
+        final Collection<MeasTypeSpec> measTypeSpecs = new ArrayList<>();
+        final Set<String> measuredEntityDns = new HashSet<>();
+
+        public void addMeasTypes(String measObjClass, String... measTypes) {
+            MeasTypeSpec spec = this.findMeasTypeSpec(measObjClass);
+            if (spec == null) {
+                spec = new MeasTypeSpec();
+                spec.measuredObjClass = measObjClass;
+                this.measTypeSpecs.add(spec);
+            }
+            for (String measType : measTypes) {
+                spec.measTypes.add(measType);
+            }
+        }
+
+        public void addMeasTypes(String measObjClass, Collection<String> measTypes) {
+            for (String measType : measTypes) {
+                addMeasTypes(measObjClass, measType);
+            }
+        }
 
         @Setter
         String pmRopStartTime;
 
         @Setter
         String pmRopEndTime;
+
+        public void addAll(FilterData other) {
+            addAll(other.sourceNames, sourceNames);
+            addAll(other.measObjInstIds, measObjInstIds);
+            addAll(other.measTypeSpecs);
+            addAll(other.measuredEntityDns, measuredEntityDns);
+        }
+
+        public MeasTypeSpec getMeasTypeSpec(String measuredObjClass) {
+            if (measTypeSpecs.isEmpty()) {
+                return MeasTypeSpec.empty();
+            }
+            return findMeasTypeSpec(measuredObjClass);
+        }
+
+        private MeasTypeSpec findMeasTypeSpec(String measuredObjClass) {
+            for (MeasTypeSpec t : this.measTypeSpecs) {
+                if (t.measuredObjClass.equals(measuredObjClass)) {
+                    return t;
+                }
+            }
+            return null;
+        }
+
+        private void addAll(Collection<MeasTypeSpec> measTypes) {
+            for (MeasTypeSpec s : measTypes) {
+                addMeasTypes(s.getMeasuredObjClass(), s.getMeasTypes());
+            }
+        }
+
+        private void addAll(Set<String> source, Set<String> dst) {
+            if (source.isEmpty()) {
+                dst.clear();
+            } else if (dst.isEmpty()) {
+                // Nothing, this means 'match all'
+            } else {
+                dst.addAll(source);
+            }
+        }
     }
 
     private static class MeasTypesIndexed extends PmReport.MeasTypes {
@@ -108,7 +190,7 @@ public class PmReportFilter implements Filter {
             if (reportFiltered == null) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.key, gson.toJson(reportFiltered).getBytes());
+            return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes());
         } catch (Exception e) {
             logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
             return FilteredData.empty();
@@ -166,17 +248,18 @@ public class PmReportFilter implements Filter {
         return false;
     }
 
-    private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, FilterData filter) {
+    private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes,
+            FilterData.MeasTypeSpec measTypesSpec) {
         String measType = measTypes.getMeasType(measResult.getP());
-        return filter.measTypes.isEmpty() || filter.measTypes.contains(measType);
+        return measTypesSpec.measTypes.isEmpty() || measTypesSpec.measTypes.contains(measType);
     }
 
     private Collection<MeasResult> createMeasResults(Collection<MeasResult> oldMeasResults, MeasTypes measTypes,
-            FilterData filter) {
+            FilterData.MeasTypeSpec measTypesSpec) {
         Collection<MeasResult> newMeasResults = new ArrayList<>();
 
         for (MeasResult measResult : oldMeasResults) {
-            if (isMeasResultMatch(measResult, measTypes, filter)) {
+            if (isMeasResultMatch(measResult, measTypes, measTypesSpec)) {
                 newMeasResults.add(measResult.toBuilder().build());
             }
         }
@@ -199,27 +282,25 @@ public class PmReportFilter implements Filter {
         return distinguishedName.substring(lastRdn + 1, lastEqualChar);
     }
 
-    private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) {
-        if (filter.measObjClass.isEmpty()) {
-            return true;
-        }
-
+    private FilterData.MeasTypeSpec getMeasTypeSpec(String measObjInstId, FilterData filter) {
         String measObjClass = managedObjectClass(measObjInstId);
-        return filter.measObjClass.contains(measObjClass);
+        return filter.getMeasTypeSpec(measObjClass);
     }
 
     private MeasValuesList createMeasValuesList(MeasValuesList oldMeasValues, MeasTypes measTypes, FilterData filter) {
+        FilterData.MeasTypeSpec measTypesSpec = getMeasTypeSpec(oldMeasValues.getMeasObjInstId(), filter);
+        if (measTypesSpec == null) {
+            return MeasValuesList.empty();
+        }
 
-        if (isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)
-                && isMeasInstClassMatch(oldMeasValues.getMeasObjInstId(), filter)) {
-
-            Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, filter);
-            return oldMeasValues.toBuilder() //
-                    .measResults(newResults) //
-                    .build();
-        } else {
+        if (!isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)) {
             return MeasValuesList.empty();
         }
+
+        Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, measTypesSpec);
+        return oldMeasValues.toBuilder() //
+                .measResults(newResults) //
+                .build();
     }
 
     private MeasTypes createMeasTypes(Collection<MeasValuesList> newMeasValues, MeasTypes oldMMeasTypes) {
index 4806604..682f15e 100644 (file)
@@ -42,12 +42,12 @@ class RegexpFilter implements Filter {
     @Override
     public FilteredData filter(DataFromTopic data) {
         if (regexp == null) {
-            return new FilteredData(data.key, data.value);
+            return new FilteredData(data.infoTypeId, data.key, data.value);
         }
         Matcher matcher = regexp.matcher(data.valueAsString());
         boolean match = matcher.find();
         if (match) {
-            return new FilteredData(data.key, data.value);
+            return new FilteredData(data.infoTypeId, data.key, data.value);
         } else {
             return FilteredData.empty();
         }
index 8603507..b235bdc 100644 (file)
@@ -28,6 +28,7 @@ import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 
 import lombok.Builder;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
@@ -36,6 +37,7 @@ import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,8 +51,6 @@ public class Job {
     @Schema(name = "job_statistics", description = "Statistics information for one job")
     public static class Statistics {
 
-        // @Schema(name = "jobId", description = "jobId", required = true)
-        // @SerializedName("jobId")
         @JsonProperty(value = "jobId", required = true)
         String jobId;
 
@@ -115,8 +115,18 @@ public class Job {
         @Getter
         private BufferTimeout bufferTimeout;
 
+        @Builder
+        @EqualsAndHashCode
+        public static class KafkaDeliveryInfo {
+            @Getter
+            private String topic;
+
+            @Getter
+            private String bootStrapServers;
+        }
+
         @Getter
-        private String kafkaOutputTopic;
+        private KafkaDeliveryInfo deliveryInfo;
 
         public Filter.Type getFilterType() {
             if (filter == null || filterType == null) {
@@ -137,9 +147,9 @@ public class Job {
     }
 
     public static class BufferTimeout {
-        public BufferTimeout(int maxSize, long maxTimeMiliseconds) {
+        public BufferTimeout(int maxSize, long maxTimeMilliseconds) {
             this.maxSize = maxSize;
-            this.maxTimeMiliseconds = maxTimeMiliseconds;
+            this.maxTimeMilliseconds = maxTimeMilliseconds;
         }
 
         public BufferTimeout() {}
@@ -147,10 +157,10 @@ public class Job {
         @Getter
         private int maxSize;
 
-        private long maxTimeMiliseconds;
+        private long maxTimeMilliseconds;
 
         public Duration getMaxTime() {
-            return Duration.ofMillis(maxTimeMiliseconds);
+            return Duration.ofMillis(maxTimeMilliseconds);
         }
     }
 
@@ -197,7 +207,7 @@ public class Job {
                 .groupId(type.getKafkaGroupId()) //
                 .inputTopic(type.getKafkaInputTopic()) //
                 .jobId(id) //
-                .outputTopic(parameters.getKafkaOutputTopic()) //
+                .outputTopic(parameters.getDeliveryInfo() == null ? "" : parameters.getDeliveryInfo().topic) //
                 .typeId(type.getId()) //
                 .clientId(type.getKafkaClientId(appConfig)) //
                 .build();
@@ -207,14 +217,14 @@ public class Job {
     public Filter.FilteredData filter(DataFromTopic data) {
         if (filter == null) {
             logger.debug("No filter used");
-            return new Filter.FilteredData(data.key, data.value);
+            return new Filter.FilteredData(data.infoTypeId, data.key, data.value);
         }
         return filter.filter(data);
     }
 
     public boolean isBuffered() {
         return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0
-                && parameters.bufferTimeout.maxTimeMiliseconds > 0;
+                && parameters.bufferTimeout.maxTimeMilliseconds > 0;
     }
 
 }
index 2c6b329..9a0559c 100644 (file)
@@ -29,12 +29,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job.Parameters;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,15 +51,141 @@ import org.springframework.stereotype.Component;
 @Component
 public class Jobs {
     public interface Observer {
-        void onJobbAdded(Job job);
+        void onJobbGroupAdded(JobGroup jobGroup);
+
+        void onJobGroupRemoved(JobGroup jobGroup);
+    }
+
+    public interface JobGroup {
+        public String getId();
+
+        public InfoType getType();
+
+        public void remove(Job job);
+
+        public boolean isEmpty();
+
+        public Filter.FilteredData filter(DataFromTopic data);
+
+        public Iterable<Job> getJobs();
+
+        public KafkaDeliveryInfo getDeliveryInfo();
+    }
+
+    public static class JobGroupSingle implements JobGroup {
+        @Getter
+        private final Job job;
+        private boolean isJobRemoved = false;
+
+        public JobGroupSingle(Job job) {
+            this.job = job;
+        }
+
+        @Override
+        public Filter.FilteredData filter(DataFromTopic data) {
+            return job.filter(data);
+        }
+
+        @Override
+        public void remove(Job job) {
+            this.isJobRemoved = true;
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return isJobRemoved;
+        }
+
+        @Override
+        public String getId() {
+            return job.getId();
+        }
+
+        @Override
+        public InfoType getType() {
+            return job.getType();
+        }
+
+        @Override
+        public Iterable<Job> getJobs() {
+            Collection<Job> c = new ArrayList<>();
+            c.add(job);
+            return c;
+        }
+
+        @Override
+        public KafkaDeliveryInfo getDeliveryInfo() {
+            return this.job.getParameters().getDeliveryInfo();
+        }
+    }
+
+    public static class JobGroupPm implements JobGroup {
+        @Getter
+        private final KafkaDeliveryInfo deliveryInfo;
 
-        void onJobRemoved(Job job);
+        private Map<String, Job> jobs = new HashMap<>();
+
+        @Getter
+        private PmReportFilter filter;
+
+        @Getter
+        private final InfoType type;
+
+        public JobGroupPm(InfoType type, KafkaDeliveryInfo topic) {
+            this.deliveryInfo = topic;
+            this.type = type;
+        }
+
+        public synchronized void add(Job job) {
+            this.jobs.put(job.getId(), job);
+            this.filter = createFilter();
+        }
+
+        public synchronized void remove(Job job) {
+            this.jobs.remove(job.getId());
+            if (!this.jobs.isEmpty()) {
+                this.filter = createFilter();
+            }
+        }
+
+        public boolean isEmpty() {
+            return jobs.isEmpty();
+        }
+
+        @Override
+        public Filter.FilteredData filter(DataFromTopic data) {
+            return filter.filter(data);
+        }
+
+        public Job getAJob() {
+            if (this.jobs.isEmpty()) {
+                return null;
+            }
+            return this.jobs.values().iterator().next();
+        }
+
+        private PmReportFilter createFilter() {
+            Collection<PmReportFilter> filterData = new ArrayList<>();
+            this.jobs.forEach((key, value) -> filterData.add((PmReportFilter) value.getFilter()));
+            return FilterFactory.createAggregateFilter(filterData);
+        }
+
+        @Override
+        public String getId() {
+            return deliveryInfo.getTopic();
+        }
+
+        @Override
+        public Iterable<Job> getJobs() {
+            return this.jobs.values();
+        }
     }
 
     private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
+    private Map<String, JobGroup> jobGroups = new HashMap<>(); // Key is Topic or JobId
     private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
     private final ApplicationConfig appConfig;
@@ -78,7 +211,7 @@ public class Jobs {
     public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
             Parameters parameters) throws ServiceException {
 
-        if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) {
+        if ((parameters.getDeliveryInfo() != null) && !Strings.isNullOrEmpty(callbackUrl)) {
             throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST);
         }
         AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
@@ -86,9 +219,6 @@ public class Jobs {
                 : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
         Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
         this.put(job);
-        synchronized (observers) {
-            this.observers.forEach(obs -> obs.onJobbAdded(job));
-        }
     }
 
     public void addObserver(Observer obs) {
@@ -97,10 +227,40 @@ public class Jobs {
         }
     }
 
+    private String jobGroupId(Job job) {
+        if (job.getParameters().getDeliveryInfo() == null) {
+            return job.getId();
+        } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) {
+            return job.getParameters().getDeliveryInfo().getTopic();
+        } else {
+            return job.getId();
+        }
+    }
+
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
+        remove(job.getId());
+
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
+
+        if (job.getParameters().getFilterType() == Filter.Type.PM_DATA
+                && job.getParameters().getDeliveryInfo() != null) {
+            String jobGroupId = jobGroupId(job);
+            if (!this.jobGroups.containsKey(jobGroupId)) {
+                final JobGroupPm group = new JobGroupPm(job.getType(), job.getParameters().getDeliveryInfo());
+                this.jobGroups.put(jobGroupId, group);
+                group.add(job);
+                this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+            } else {
+                JobGroupPm group = (JobGroupPm) this.jobGroups.get(jobGroupId);
+                group.add(job);
+            }
+        } else {
+            JobGroupSingle group = new JobGroupSingle(job);
+            this.jobGroups.put(jobGroupId(job), group);
+            this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+        }
     }
 
     public synchronized Iterable<Job> getAll() {
@@ -116,15 +276,20 @@ public class Jobs {
     }
 
     public void remove(Job job) {
+        String groupId = jobGroupId(job);
+        JobGroup group = this.jobGroups.get(groupId);
         synchronized (this) {
             this.allJobs.remove(job.getId());
             jobsByType.remove(job.getType().getId(), job.getId());
+            group.remove(job);
+            if (group.isEmpty()) {
+                this.jobGroups.remove(groupId);
+            }
         }
-        notifyJobRemoved(job);
-    }
 
-    private synchronized void notifyJobRemoved(Job job) {
-        this.observers.forEach(obs -> obs.onJobRemoved(job));
+        if (group.isEmpty()) {
+            this.observers.forEach(obs -> obs.onJobGroupRemoved(group));
+        }
     }
 
     public synchronized int size() {
@@ -137,7 +302,7 @@ public class Jobs {
 
     public void clear() {
 
-        this.allJobs.forEach((id, job) -> notifyJobRemoved(job));
+        this.jobGroups.forEach((id, group) -> this.observers.forEach(obs -> obs.onJobGroupRemoved(group)));
 
         synchronized (this) {
             allJobs.clear();
index 719597a..a9123e6 100644 (file)
@@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener {
                 .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
                 .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
-                .map(input -> new DataFromTopic(null, input.getBytes(), false))
+                .map(input -> new DataFromTopic(this.type.getId(), null, null, input.getBytes()))
                 .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
                 .publish() //
                 .autoConnect();
index 71918db..6193af6 100644 (file)
@@ -23,6 +23,8 @@ package org.oran.dmaapadapter.tasks;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
@@ -36,14 +38,15 @@ import reactor.core.publisher.Mono;
 public class HttpJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+    public HttpJobDataDistributor(JobGroup job, ApplicationConfig config) {
         super(job, config);
     }
 
     @Override
     protected Mono<String> sendToClient(Filter.FilteredData output) {
-        Job job = this.getJob();
-        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
+        JobGroupSingle group = (JobGroupSingle) this.getJobGroup();
+        Job job = group.getJob();
+        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output.getValueAString());
         MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
         return job.getConsumerRestClient().post("", output.getValueAString(), contentType);
     }
index f9353f1..ef493a5 100644 (file)
@@ -34,6 +34,9 @@ import org.oran.dmaapadapter.datastore.DataStore;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupPm;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +53,7 @@ public abstract class JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
 
     @Getter
-    private final Job job;
+    private final JobGroup jobGroup;
     private Disposable subscription;
     private final ErrorStats errorStats = new ErrorStats();
 
@@ -71,50 +74,48 @@ public abstract class JobDataDistributor {
         }
     }
 
-    protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+    protected JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) {
         this.applConfig = applConfig;
-        this.job = job;
+        this.jobGroup = jobGroup;
         this.dataStore = DataStore.create(applConfig);
         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
         this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
     }
 
     public void start(Flux<TopicListener.DataFromTopic> input) {
-        logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(),
-                job.getParameters().getKafkaOutputTopic());
-        PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
-
+        logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
+        PmReportFilter filter = getPmReportFilter(this.jobGroup);
         if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
-            this.subscription = filterAndBuffer(input, this.job) //
+            this.subscription = filterAndBuffer(input, this.jobGroup) //
                     .flatMap(this::sendToClient) //
                     .onErrorResume(this::handleError) //
                     .subscribe(this::handleSentOk, //
                             this::handleExceptionInStream, //
-                            () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId()));
+                            () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));
         }
 
         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
             this.dataStore.createLock(collectHistoricalDataLockName()) //
                     .doOnNext(isLockGranted -> {
                         if (isLockGranted.booleanValue()) {
-                            logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId());
+                            logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());
                         } else {
                             logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
-                                    this.job.getId());
+                                    this.jobGroup.getId());
                         }
                     }) //
                     .filter(isLockGranted -> isLockGranted) //
                     .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
                     .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
-                            this.job.getId())) //
+                            this.jobGroup.getId())) //
                     .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
                     .filter(this::isRopFile) //
                     .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
                     .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
                     .map(this::createFakeEvent) //
-                    .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
+                    .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),
                             dataStore), 100)
-                    .map(job::filter) //
+                    .map(jobGroup::filter) //
                     .map(this::gzip) //
                     .flatMap(this::sendToClient, 1) //
                     .onErrorResume(this::handleCollectHistoricalDataError) //
@@ -122,6 +123,17 @@ public abstract class JobDataDistributor {
         }
     }
 
+    private static PmReportFilter getPmReportFilter(JobGroup jobGroup) {
+
+        if (jobGroup instanceof JobGroupPm) {
+            return ((JobGroupPm) jobGroup).getFilter();
+        } else if (jobGroup instanceof JobGroupSingle) {
+            Filter f = ((JobGroupSingle) jobGroup).getJob().getFilter();
+            return (f instanceof PmReportFilter) ? (PmReportFilter) f : null;
+        }
+        return null;
+    }
+
     private Filter.FilteredData gzip(Filter.FilteredData data) {
         if (this.applConfig.isZipOutput()) {
             try {
@@ -131,7 +143,7 @@ public abstract class JobDataDistributor {
                 gzip.flush();
                 gzip.close();
                 byte[] zipped = out.toByteArray();
-                return new Filter.FilteredData(data.key, zipped, true);
+                return new Filter.FilteredData(data.infoTypeId, data.key, zipped, true);
             } catch (IOException e) {
                 logger.error("Unexpected exception when zipping: {}", e.getMessage());
                 return data;
@@ -142,30 +154,28 @@ public abstract class JobDataDistributor {
     }
 
     private Mono<String> handleCollectHistoricalDataError(Throwable t) {
-        logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
+        logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId());
         return tryDeleteLockFile() //
                 .map(bool -> "OK");
     }
 
     private String collectHistoricalDataLockName() {
-        return "collectHistoricalDataLock" + this.job.getId();
+        return "collectHistoricalDataLock" + this.jobGroup.getId();
     }
 
     private TopicListener.DataFromTopic createFakeEvent(String fileName) {
         NewFileEvent ev = new NewFileEvent(fileName);
-        return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
+        return new TopicListener.DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes());
     }
 
     private static String fileTimePartFromRopFileName(String fileName) {
+        // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"
         return fileName.substring(fileName.lastIndexOf("/") + 2);
     }
 
     private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
-        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
         try {
-            String fileTimePart = fileTimePartFromRopFileName(fileName);
-            fileTimePart = fileTimePart.substring(0, 18);
-            OffsetDateTime fileStartTime = parseFileDate(fileTimePart);
+            OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName);
             OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
             boolean isMatch = fileStartTime.isAfter(startTime);
             logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
@@ -182,14 +192,11 @@ public abstract class JobDataDistributor {
     }
 
     private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
-        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
         if (filter.getPmRopEndTime() == null) {
             return true;
         }
         try {
-            String fileTimePart = fileTimePartFromRopFileName(fileName);
-            fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
-            OffsetDateTime fileEndTime = parseFileDate(fileTimePart);
+            OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName);
             OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
             boolean isMatch = fileEndTime.isBefore(endTime);
             logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
@@ -202,6 +209,20 @@ public abstract class JobDataDistributor {
         }
     }
 
+    private static OffsetDateTime getStartTimeFromFileName(String fileName) {
+        String fileTimePart = fileTimePartFromRopFileName(fileName);
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+        fileTimePart = fileTimePart.substring(0, 18);
+        return parseFileDate(fileTimePart);
+    }
+
+    private static OffsetDateTime getEndTimeFromFileName(String fileName) {
+        String fileTimePart = fileTimePartFromRopFileName(fileName);
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+        fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
+        return parseFileDate(fileTimePart);
+    }
+
     private static OffsetDateTime parseFileDate(String timeStr) {
         DateTimeFormatter startTimeFormatter =
                 new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
@@ -209,14 +230,14 @@ public abstract class JobDataDistributor {
     }
 
     private void handleExceptionInStream(Throwable t) {
-        logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
+        logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId());
     }
 
     protected abstract Mono<String> sendToClient(Filter.FilteredData output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
-            logger.debug("Stopped, job: {}", job.getId());
+            logger.debug("Stopped, job: {}", jobGroup.getId());
             this.subscription.dispose();
             this.subscription = null;
         }
@@ -233,23 +254,26 @@ public abstract class JobDataDistributor {
         return this.subscription != null;
     }
 
-    private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+    private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, JobGroup jobGroup) {
         Flux<Filter.FilteredData> filtered = //
                 inputFlux //
-                        .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) //
-                        .doOnNext(data -> job.getStatistics().received(data.value)) //
-                        .map(job::filter) //
+                        .doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) //
+                        .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) //
+                        .map(jobGroup::filter) //
                         .map(this::gzip) //
                         .filter(f -> !f.isEmpty()) //
-                        .doOnNext(f -> job.getStatistics().filtered(f.value)) //
-                        .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); //
+                        .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) //
+                        .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) //
+        ; //
 
+        Job job = jobGroup.getJobs().iterator().next();
         if (job.isBuffered()) {
             filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes()));
+                    .map(buffered -> new Filter.FilteredData(this.getJobGroup().getType().getId(), null,
+                            buffered.toString().getBytes()));
         }
         return filtered;
     }
@@ -264,7 +288,7 @@ public abstract class JobDataDistributor {
     }
 
     private Mono<String> handleError(Throwable t) {
-        logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+        logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId());
         this.errorStats.handleException(t);
         return Mono.empty(); // Ignore
     }
index 6b33c3b..24730a1 100644 (file)
@@ -29,7 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
-import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,25 +49,25 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
 
     private KafkaSender<byte[], byte[]> sender;
 
-    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
-        super(job, appConfig);
+    public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) {
+        super(jobGroup, appConfig);
 
-        SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
+        SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig, jobGroup.getDeliveryInfo());
         this.sender = KafkaSender.create(senderOptions);
     }
 
     @Override
     protected Mono<String> sendToClient(Filter.FilteredData data) {
-        Job job = this.getJob();
-        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, job);
+
+        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo());
 
         logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
-                job.getParameters().getKafkaOutputTopic());
+                this.getJobGroup().getDeliveryInfo());
 
         return this.sender.send(Mono.just(senderRecord)) //
-                .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) //
-                .doOnError(
-                        t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) //
+                .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) //
+                .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
+                        t.getMessage())) //
                 .onErrorResume(t -> Mono.empty()) //
                 .collectList() //
                 .map(x -> "ok");
@@ -82,8 +83,13 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         }
     }
 
-    private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config) {
-        String bootstrapServers = config.getKafkaBootStrapServers();
+    private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config,
+            KafkaDeliveryInfo deliveryInfo) {
+
+        String bootstrapServers = deliveryInfo.getBootStrapServers();
+        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+            bootstrapServers = config.getKafkaBootStrapServers();
+        }
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -93,10 +99,11 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
+    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output,
+            KafkaDeliveryInfo deliveryInfo) {
         int correlationMetadata = 2;
-        String topic = infoJob.getParameters().getKafkaOutputTopic();
-        var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
+        var producerRecord =
+                new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers());
         return SenderRecord.create(producerRecord, correlationMetadata);
     }
 
index 1a20c68..021678d 100644 (file)
@@ -85,7 +85,7 @@ public class KafkaTopicListener implements TopicListener {
                 .doFinally(
                         sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
                 .filter(t -> t.value().length > 0 || t.key().length > 0) //
-                .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
+                .map(input -> new DataFromTopic(this.type.getId(), input.headers(), input.key(), input.value())) //
                 .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) //
                 .publish() //
                 .autoConnect(1);
@@ -126,7 +126,7 @@ public class KafkaTopicListener implements TopicListener {
             logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic());
             return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
                     .map(bytes -> unzip(bytes, ev.getFilename())) //
-                    .map(bytes -> new DataFromTopic(data.key, bytes, false));
+                    .map(bytes -> new DataFromTopic(data.infoTypeId, data.headers, data.key, bytes));
 
         } catch (Exception e) {
             return Mono.just(data);
index 8b1afc8..debc783 100644 (file)
@@ -34,7 +34,10 @@ public interface TopicListener {
     public static class DataFromTopic {
         public final byte[] key;
         public final byte[] value;
-        public final boolean isZipped;
+
+        public final String infoTypeId;
+
+        public final Iterable<Header> headers;
 
         private static byte[] noBytes = new byte[0];
 
@@ -43,29 +46,43 @@ public interface TopicListener {
         @ToString.Exclude
         private PmReport cachedPmReport;
 
-        public DataFromTopic(byte[] key, byte[] value, boolean isZipped) {
+        public DataFromTopic(String typeId, Iterable<Header> headers, byte[] key, byte[] value) {
             this.key = key == null ? noBytes : key;
             this.value = value == null ? noBytes : value;
-            this.isZipped = isZipped;
+            this.infoTypeId = typeId;
+            this.headers = headers;
         }
 
         public String valueAsString() {
             return new String(this.value);
         }
 
-        public static final String ZIP_PROPERTY = "gzip";
+        public static final String ZIPPED_PROPERTY = "gzip";
+        public static final String TYPE_ID_PROPERTY = "type-id";
 
-        public static boolean findZipped(Iterable<Header> headers) {
+        public boolean isZipped() {
             if (headers == null) {
                 return false;
             }
             for (Header h : headers) {
-                if (h.key().equals(ZIP_PROPERTY)) {
+                if (h.key().equals(ZIPPED_PROPERTY)) {
                     return true;
                 }
             }
             return false;
         }
+
+        public String getTypeIdFromHeaders() {
+            if (headers == null) {
+                return "";
+            }
+            for (Header h : headers) {
+                if (h.key().equals(TYPE_ID_PROPERTY)) {
+                    return new String(h.value());
+                }
+            }
+            return "";
+        }
     }
 
     public Flux<DataFromTopic> getFlux();
index a3e3703..38035c6 100644 (file)
@@ -25,13 +25,12 @@ import java.util.Map;
 
 import lombok.Getter;
 
-import org.apache.logging.log4j.util.Strings;
 import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
 import org.oran.dmaapadapter.repository.MultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,52 +69,52 @@ public class TopicListeners {
 
         jobs.addObserver(new Jobs.Observer() {
             @Override
-            public void onJobbAdded(Job job) {
-                addJob(job);
+            public void onJobbGroupAdded(JobGroup jobGroup) {
+                addJob(jobGroup);
             }
 
             @Override
-            public void onJobRemoved(Job job) {
-                removeJob(job);
+            public void onJobGroupRemoved(JobGroup jobGroup) {
+                removeDistributor(jobGroup);
             }
         });
     }
 
-    public synchronized void addJob(Job job) {
-        removeJob(job);
-        logger.debug("Job added {}", job.getId());
-        if (job.getType().isKafkaTopicDefined()) {
-            addConsumer(job, dataDistributors, kafkaTopicListeners);
+    public synchronized void addJob(JobGroup jobGroup) {
+        removeDistributor(jobGroup);
+        logger.debug("Job added {}", jobGroup.getId());
+        if (jobGroup.getType().isKafkaTopicDefined()) {
+            addDistributor(jobGroup, dataDistributors, kafkaTopicListeners);
         }
 
-        if (job.getType().isDmaapTopicDefined()) {
-            addConsumer(job, dataDistributors, dmaapTopicListeners);
+        if (jobGroup.getType().isDmaapTopicDefined()) {
+            addDistributor(jobGroup, dataDistributors, dmaapTopicListeners);
         }
     }
 
-    private JobDataDistributor createConsumer(Job job) {
-        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
-                : new HttpJobDataDistributor(job, appConfig);
+    private JobDataDistributor createDistributor(JobGroup jobGroup) {
+        return jobGroup.getDeliveryInfo() != null ? new KafkaJobDataDistributor(jobGroup, appConfig)
+                : new HttpJobDataDistributor(jobGroup, appConfig);
     }
 
-    private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+    private void addDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors,
             Map<String, TopicListener> topicListeners) {
-        TopicListener topicListener = topicListeners.get(job.getType().getId());
-        JobDataDistributor distributor = createConsumer(job);
+        TopicListener topicListener = topicListeners.get(jobGroup.getType().getId());
+        JobDataDistributor distributor = createDistributor(jobGroup);
 
         distributor.start(topicListener.getFlux());
 
-        distributors.put(job.getType().getId(), job.getId(), distributor);
+        distributors.put(jobGroup.getType().getId(), jobGroup.getId(), distributor);
     }
 
-    public synchronized void removeJob(Job job) {
-        removeJob(job, dataDistributors);
+    public synchronized void removeDistributor(JobGroup jobGroup) {
+        removeDistributor(jobGroup, dataDistributors);
     }
 
-    private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
-        JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+    private static void removeDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors) {
+        JobDataDistributor distributor = distributors.remove(jobGroup.getType().getId(), jobGroup.getId());
         if (distributor != null) {
-            logger.debug("Job removed {}", job.getId());
+            logger.debug("Job removed {}", jobGroup.getId());
             distributor.stop();
         }
     }
index be2829f..d5b20fc 100644 (file)
       "type": "integer",
       "minimum": 1
     },
-    "kafkaOutputTopic" : {
-      "type": "string"
+    "deliveryInfo": {
+      "type": "object",
+      "properties": {
+        "topic": {
+          "type": "string"
+        },
+        "bootStrapServers": {
+          "type": "string"
+        }
+      },
+      "required": [
+        "topic"
+      ]
     },
     "bufferTimeout": {
       "type": "object",
@@ -27,7 +38,7 @@
           "type": "integer",
           "minimum": 1
         },
-        "maxTimeMiliseconds": {
+        "maxTimeMilliseconds": {
           "type": "integer",
           "minimum": 0,
           "maximum": 160000
@@ -36,9 +47,9 @@
       "additionalProperties": false,
       "required": [
         "maxSize",
-        "maxTimeMiliseconds"
+        "maxTimeMilliseconds"
       ]
     }
   },
   "additionalProperties": false
-}
+}
\ No newline at end of file
index df3f723..2e14ede 100644 (file)
                   }
                ]
             },
-            "measObjClass": {
+            "measTypeSpecs": {
                "type": "array",
                "items": [
                   {
-                     "type": "string"
-                  }
-               ]
-            },
-            "measTypes": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
+                     "type": "object",
+                     "properties": {
+                        "measuredObjClass": {
+                           "type": "string"
+                        },
+                        "measTypes": {
+                           "type": "array",
+                           "items": [
+                              {
+                                 "type": "string"
+                              }
+                           ]
+                        }
+                     },
+                     "required": [
+                        "measuredObjClass"
+                     ]
                   }
                ]
             },
             "pmdata"
          ]
       },
-      "kafkaOutputTopic": {
-         "type": "string"
+      "deliveryInfo": {
+         "type": "object",
+         "additionalProperties": false,
+         "properties": {
+            "topic": {
+               "type": "string"
+            },
+            "bootStrapServers": {
+               "type": "string"
+            }
+         },
+         "required": [
+            "topic"
+         ]
       }
-   }
+   },
+   "required": [
+      "filter",
+      "filterType"
+   ]
 }
\ No newline at end of file
index 10779d0..da64e0c 100644 (file)
@@ -40,7 +40,9 @@ import java.util.Map;
 import org.json.JSONObject;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
@@ -75,11 +77,13 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.TestPropertySource;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+@TestMethodOrder(MethodOrderer.MethodName.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
@@ -115,7 +119,7 @@ class ApplicationTest {
     @Autowired
     private SecurityContext securityContext;
 
-    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
     @LocalServerPort
     int localServerHttpPort;
@@ -383,7 +387,7 @@ class ApplicationTest {
         // Create a job with a PM filter
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
 
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
         filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
@@ -559,7 +563,7 @@ class ApplicationTest {
 
         // Create a job with a PM filter
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.addMeasTypes("ManagedElement", "succImmediateAssignProcs");
         Job.Parameters param =
                 Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
 
@@ -601,7 +605,27 @@ class ApplicationTest {
         String stats = restClient().get(targetUri).block();
 
         assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+    }
+
+    @Test
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    void testZZActuator() throws Exception {
+        // The test must be run last, hence the "ZZ" in the name. All succeeding tests
+        // will fail.
+        AsyncRestClient client = restClient();
+        client.post("/actuator/loggers/org.oran.dmaapadapter", "{\"configuredLevel\":\"trace\"}").block();
+        String resp = client.get("/actuator/loggers/org.oran.dmaapadapter").block();
+        assertThat(resp).contains("TRACE");
+        client.post("/actuator/loggers/org.springframework.boot.actuate", "{\"configuredLevel\":\"trace\"}").block();
+        // This will stop the web server and all coming tests will fail.
+        client.post("/actuator/shutdown", "").block();
+        Thread.sleep(1000);
 
+        String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
+        StepVerifier.create(restClient().get(url)) // Any call
+                .expectSubscription() //
+                .expectErrorMatches(t -> t instanceof WebClientRequestException) //
+                .verify();
     }
 
     public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
index a62ae7d..f1b0f21 100644 (file)
@@ -54,9 +54,9 @@ public class ConsumerController {
 
     public static class TestResults {
 
-        public List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
+        public final List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
 
-        public List<Map<String, String>> receivedHeaders =
+        public final List<Map<String, String>> receivedHeaders =
                 Collections.synchronizedList(new ArrayList<Map<String, String>>());
 
         public TestResults() {}
index 5f7a886..4146212 100644 (file)
@@ -27,10 +27,6 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
@@ -39,6 +35,7 @@ import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
@@ -275,38 +272,37 @@ class IntegrationWithIcs {
         ConsumerController.TestResults results = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2));
         assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
-        assertThat(results.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         deleteInformationJobInIcs(DMAAP_JOB_ID);
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
+    private String pmJobParameters() {
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+
+        filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
+        filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
+        filterData.getSourceNames().add("O-DU-1122");
+        filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
+        Job.Parameters param =
+                Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
+
+        return gson.toJson(param);
+    }
+
     @Test
     void testPmFilter() throws Exception {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "PmDataOverRest";
 
-        String jsonStr =
-                reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }");
+        String jsonStr = pmJobParameters();
 
         ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), "");
 
         createInformationJobInIcs(DMAAP_JOB_ID, jobInfo);
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        String path = "./src/test/resources/pm_report.json";
-        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-        DmaapSimulatorController.addPmResponse(pmReportJson);
-
-        ConsumerController.TestResults results = this.consumerController.testResults;
-        await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(1));
-
-        String filtered = results.receivedBodies.get(0);
-        assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("attTCHSeizures");
-
-        logger.info(filtered);
-
         deleteInformationJobInIcs(DMAAP_JOB_ID);
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
index 7e806e2..12bec80 100644 (file)
@@ -203,16 +203,17 @@ class IntegrationWithKafka {
         }
 
         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
-            if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) {
-                logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped,
+            if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
+                logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
                         this.applicationConfig.isZipOutput());
             }
-            if (!receivedKafkaOutput.isZipped) {
+
+            if (!receivedKafkaOutput.isZipped()) {
                 return receivedKafkaOutput;
             }
             try {
                 byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value);
-                return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false);
+                return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
             } catch (IOException e) {
                 logger.error("********* ERROR ", e.getMessage());
                 return null;
@@ -222,7 +223,10 @@ class IntegrationWithKafka {
         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
             this.count++;
-            logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+            if (logger.isDebugEnabled()) {
+                logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+                logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+            }
         }
 
         synchronized String lastKey() {
@@ -234,7 +238,7 @@ class IntegrationWithKafka {
         }
 
         void reset() {
-            this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false);
+            this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
             this.count = 0;
         }
     }
@@ -303,8 +307,8 @@ class IntegrationWithKafka {
         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
     }
 
-    private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize) {
-        Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
+    private static Object jobParametersAsJsonObject(String filter, long maxTimeMilliseconds, int maxSize) {
+        Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMilliseconds) : null;
         Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE)
                 .bufferTimeout(buffer).build();
 
@@ -332,8 +336,14 @@ class IntegrationWithKafka {
 
     ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
         try {
-            Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
-                    .kafkaOutputTopic(topic).build();
+            Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+                    .topic(topic) //
+                    .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+                    .build();
+            Job.Parameters param = Job.Parameters.builder() //
+                    .filter(filterData) //
+                    .filterType(Job.Parameters.PM_FILTER_TYPE).deliveryInfo(deliveryInfo) //
+                    .build();
 
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
@@ -344,9 +354,13 @@ class IntegrationWithKafka {
         }
     }
 
-    ConsumerJobInfo consumerJobInfoKafka(String topic) {
+    private ConsumerJobInfo consumerJobInfoKafka(String topic) {
         try {
-            Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
+            Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+                    .topic(topic) //
+                    .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+                    .build();
+            Job.Parameters param = Job.Parameters.builder().deliveryInfo(deliveryInfo).build();
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
 
@@ -494,9 +508,11 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
         this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
+
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
         waitForKafkaListener();
 
         var dataToSend = Flux.range(1, 3).map(i -> kafkaSenderRecord("Message_" + i, "", KAFKA_TYPE_ID)); // Message_1,
@@ -573,8 +589,8 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("pmCounterNumber0");
-        filterData.getMeasObjClass().add("NRCellCU");
+
+        filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
 
         this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
                 restClient());
@@ -622,11 +638,10 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        final int NO_OF_COUNTERS = 5;
+        final int NO_OF_COUNTERS = 2;
         for (int i = 0; i < NO_OF_COUNTERS; ++i) {
-            filterData.getMeasTypes().add("pmCounterNumber" + i);
+            filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
         }
-        filterData.getMeasObjClass().add("NRCellCU");
 
         final int NO_OF_JOBS = 150;
 
@@ -678,34 +693,31 @@ class IntegrationWithKafka {
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
-    void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception {
+    void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
         // Filter PM reports and sent to many jobs over Kafka
 
-        this.applicationConfig.setZipOutput(true);
+        this.applicationConfig.setZipOutput(false);
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        final int NO_OF_COUNTERS = 0;
-        for (int i = 0; i < NO_OF_COUNTERS; ++i) {
-            filterData.getMeasTypes().add("pmCounterNumber" + i);
-        }
-        filterData.getMeasObjClass().add("NRCellCU");
+        final int NO_OF_JOBS = 150;
+        ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+        for (int i = 0; i < NO_OF_JOBS; ++i) {
+            final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
+            final String jobId = "manyJobs_" + i;
+            PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+            filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
 
-        final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient());
+            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
 
-        final int NO_OF_RECEIVERS = 150;
-        ArrayList<KafkaReceiver> receivers = new ArrayList<>();
-        for (int i = 0; i < NO_OF_RECEIVERS; ++i) {
             KafkaReceiver receiver =
                     new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
             receivers.add(receiver);
         }
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
         final int NO_OF_OBJECTS = 1000;
index 6fa7ce8..7ad5a6c 100644 (file)
@@ -33,7 +33,7 @@ import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 class JsltFilterTest {
 
     private String filterReport(JsltFilter filter) throws Exception {
-        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+        DataFromTopic data = new DataFromTopic("type", null, null, loadReport().getBytes());
         FilteredData filtered = filter.filter(data);
         return filtered.getValueAString();
     }
index 6e75757..2f5b4b1 100644 (file)
@@ -36,7 +36,7 @@ class JsonPathFilterTest {
     void testJsonPath() throws Exception {
         String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
         JsonPathFilter filter = new JsonPathFilter(exp);
-        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+        DataFromTopic data = new DataFromTopic("typeId", null, null, loadReport().getBytes());
         FilteredData filtered = filter.filter(data);
         String res = filtered.getValueAString();
         assertThat(res).isEqualTo("\"attTCHSeizures\"");
index 2f1f3a6..5f3f69c 100644 (file)
@@ -99,7 +99,8 @@ class PmReportFilterTest {
 
     private String filterReport(PmReportFilter filter) throws Exception {
 
-        TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+        TopicListener.DataFromTopic data =
+                new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
         FilteredData filtered = filter.filter(data);
 
         String reportAfterFilter = gson.toJson(data.getCachedPmReport());
@@ -114,7 +115,7 @@ class PmReportFilterTest {
     void testPmFilterMeasTypes() throws Exception {
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.measTypes.add("succImmediateAssignProcs");
+        filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
 
         PmReportFilter filter = new PmReportFilter(filterData);
         String filtered = filterReport(filter);
@@ -124,7 +125,8 @@ class PmReportFilterTest {
 
         // Test that no report is returned if not meas types were found
         filterData = new PmReportFilter.FilterData();
-        filterData.measTypes.add("junk");
+        filterData.addMeasTypes("junk", "succImmediateAssignProcs");
+
         filter = new PmReportFilter(filterData);
         filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
@@ -149,22 +151,23 @@ class PmReportFilterTest {
     void testMeasObjClass() throws Exception {
         {
             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-            filterData.measObjClass.add("junk");
+            filterData.addMeasTypes("junk");
             PmReportFilter filter = new PmReportFilter(filterData);
             String filtered = filterReport(filter);
             assertThat(filtered).isEmpty();
         }
 
         {
-            TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+            TopicListener.DataFromTopic data =
+                    new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
 
             PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
-            utranCellFilter.measObjClass.add("UtranCell");
+            utranCellFilter.addMeasTypes("UtranCell");
             FilteredData filtered = new PmReportFilter(utranCellFilter).filter(data);
             assertThat(filtered.getValueAString()).contains("UtranCell").doesNotContain("ENodeBFunction");
 
             PmReportFilter.FilterData eNodeBFilter = new PmReportFilter.FilterData();
-            eNodeBFilter.measObjClass.add("ENodeBFunction");
+            eNodeBFilter.addMeasTypes("ENodeBFunction");
             filtered = new PmReportFilter(eNodeBFilter).filter(data);
             assertThat(filtered.getValueAString()).contains("ENodeBFunction").doesNotContain("UtranCell");
         }
@@ -210,10 +213,9 @@ class PmReportFilterTest {
         {
 
             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-            filterData.getMeasTypes().add("pmCounterNumber0");
-            filterData.getMeasObjClass().add("NRCellCU");
+            filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
             PmReportFilter filter = new PmReportFilter(filterData);
-            DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false);
+            DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes());
 
             Instant startTime = Instant.now();
             for (int i = 0; i < TIMES; ++i) {
@@ -266,10 +268,11 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         PmReportFilter filter = new PmReportFilter(filterData);
 
-        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false));
+        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic("typeId", null, null, "junk".getBytes()));
         assertThat(filtered.isEmpty()).isTrue();
 
-        filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false));
+        filtered = filter
+                .filter(new TopicListener.DataFromTopic("typeId", null, null, reQuote("{'msg': 'test'}").getBytes()));
         assertThat(filtered.isEmpty()).isTrue();
 
     }
index 1aa97c1..402ffea 100644 (file)
@@ -2,16 +2,11 @@
    "event": {
       "commonEventHeader": {
          "domain": "perf3gpp",
-         "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882",
-         "sequence": 0,
          "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult",
          "sourceName": "O-DU-1122",
          "reportingEntityName": "",
-         "priority": "Normal",
          "startEpochMicrosec": 951912000000,
          "lastEpochMicrosec": 951912900000,
-         "version": "4.0",
-         "vesEventListenerVersion": "7.1",
          "timeZoneOffset": "+00:00"
       },
       "perf3gppFields": {