DMaaP adapter, support for shared Kafka topics 57/10057/3
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 8 Dec 2022 13:28:23 +0000 (14:28 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 12 Dec 2022 13:21:55 +0000 (14:21 +0100)
If several PM subscribers specify the same Kafka ouput topic, the
ouput objects will contain the sum of all filtered output data.

Added info type if to the kafka header

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-820
Change-Id: I33e6950f91fa0088658f463506a7c70670cc6e42

26 files changed:
docs/Pictures.pptx
docs/dedicatedTopics.png [new file with mode: 0644]
docs/overview.rst
docs/sharedTopics.png [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
src/main/java/org/oran/dmaapadapter/filter/Filter.java
src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java
src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java
src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java
src/main/java/org/oran/dmaapadapter/filter/PmReport.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/test/java/org/oran/dmaapadapter/ConsumerController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java

index e2cf47e..2ec5a51 100644 (file)
Binary files a/docs/Pictures.pptx and b/docs/Pictures.pptx differ
diff --git a/docs/dedicatedTopics.png b/docs/dedicatedTopics.png
new file mode 100644 (file)
index 0000000..0b91b75
Binary files /dev/null and b/docs/dedicatedTopics.png differ
index 1174e44..cd80ce0 100644 (file)
@@ -179,6 +179,8 @@ This schema will be registered when the configured dataType is "pmData".
 This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to
 define which meas-types (counters) to get from which resources.
 
+"It is possible to both filter on new data that is collected from the traffical nodes and to query from data that is already collected.
+
 The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering.
 
 * sourceNames an array of source names for wanted PM reports.
@@ -188,9 +190,11 @@ The filterType parameter is extended to allow value "pmdata" which can be used f
 * measuredEntityDns partial match of meas entity DNs.
 * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
   Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
+* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned.
+  In this case, the query is only done for files from the given "sourceNames".
+  If this parameter is excluded, only "new" reports will be delivered as they are collected from the traffical nodes.
 
-All PM filter properties are optional and a non given will result in "match all".
-The result of the filtering is still following the structure of a 3GPP PM report.
+All PM filter properties are optional and a non given will result in "match all" (except the pmRopStartTime).
 
 Below follows an example of a PM filter.
 
@@ -210,6 +214,134 @@ Below follows an example of a PM filter.
         ],
         "measuredEntityDns":[
            "ManagedElement=RNC-Gbg-1"
+        ],
+        "measObjClass":[
+           "UtranCell"
+        ]
+        "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00"
+      }
+    }
+
+
+Here is an example of a filter that will
+match two counters from all cells in two traffical nodes.
+
+.. code-block:: javascript
+
+    {
+      "filterType":"pmdata",
+      "filter": {
+        "sourceNames":[
+           "O-DU-1122", "O-DU-1123"
+        ],
+        "measTypes":[
+           "succImmediateAssignProcs", "attTCHSeizures"
+        ],
+        "measObjClass":[
+           "UtranCell"
         ]
       }
     }
+
+
+********************
+Bulk PM subscription
+********************
+
+The sequence is that a "new file event" is received (from a Kafka topic).
+The file is read from local storage (file storage or S3 object store). For each Job, the specified PM filter is applied to the data
+and the result is sent to the Kafka topic specified by the Job (by the data consumer).
+
+.. image:: ./dedicatedTopics.png
+   :width: 500pt
+
+The result of the PM filtering is still following the structure of a 3GPP PM report.
+Here follows an example of a resulting delivered PM report.
+
+.. code-block:: javascript
+
+   {
+      "event":{
+         "commonEventHeader":{
+            "domain":"perf3gpp",
+            "eventId":"9efa1210-f285-455f-9c6a-3a659b1f1882",
+            "eventName":"perf3gpp_gnb-Ericsson_pmMeasResult",
+            "sourceName":"O-DU-1122",
+            "reportingEntityName":"",
+            "startEpochMicrosec":951912000000,
+            "lastEpochMicrosec":951912900000,
+            "timeZoneOffset":"+00:00"
+         },
+         "perf3gppFields":{
+            "perf3gppFieldsVersion":"1.0",
+            "measDataCollection":{
+               "granularityPeriod":900,
+               "measuredEntityUserName":"RNC Telecomville",
+               "measuredEntityDn":"SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
+               "measuredEntitySoftwareVersion":"",
+               "measInfoList":[
+                  {
+                     "measInfoId":{
+                        "sMeasInfoId":""
+                     },
+                     "measTypes":{
+                        "map":{
+                           "succImmediateAssignProcs":1
+                        },
+                        "sMeasTypesList":[
+                           "succImmediateAssignProcs"
+                        ]
+                     },
+                     "measValuesList":[
+                        {
+                           "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-997",
+                           "suspectFlag":"false",
+                           "measResults":[
+                              {
+                                 "p":1,
+                                 "sValue":"1113"
+                              }
+                           ]
+                        },
+                        {
+                           "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-998",
+                           "suspectFlag":"false",
+                           "measResults":[
+                              {
+                                 "p":1,
+                                 "sValue":"234"
+                              }
+                           ]
+                        },
+                        {
+                           "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-999",
+                           "suspectFlag":"true",
+                           "measResults":[
+                              {
+                                 "p":1,
+                                 "sValue":"789"
+                              }
+                           ]
+                        }
+                     ]
+                  }
+               ]
+            }
+         }
+      }
+   }
+
+If several jobs publish to the same Kafka topic (shared topic), the resulting filtered output will be an aggregate of all matching filters.
+So, each consumer will then get more data than requested.
+
+.. image:: ./sharedTopics.png
+   :width: 500pt
+
+==================
+Sent Kafka headers
+==================
+
+For each filtered result sent to a Kafka topic, there will the following proerties in the Kafa header:
+
+* type-id, this propery is used to indicate the ID of the information type. The value is a string.
+* gzip, if this property exists the object is gzipped (otherwise not). The property has no value.
\ No newline at end of file
diff --git a/docs/sharedTopics.png b/docs/sharedTopics.png
new file mode 100644 (file)
index 0000000..55668c4
Binary files /dev/null and b/docs/sharedTopics.png differ
index 1816dc3..c118da3 100644 (file)
@@ -97,7 +97,7 @@ public class ProducerCallbacksController {
             logger.warn("jobCreatedCallback failed: {}", e.getMessage());
             return ErrorResponse.create(e, e.getHttpStatus());
         } catch (Exception e) {
-            logger.warn("jobCreatedCallback failed: {}", e.getMessage());
+            logger.warn("jobCreatedCallback failed: {}", e.getMessage(), e);
             return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
         }
     }
index 6c5fa25..e327fb6 100644 (file)
@@ -39,24 +39,26 @@ public interface Filter {
     public static class FilteredData {
         public final byte[] key;
         public final byte[] value;
+        public final String infoTypeId;
 
         @Getter
         private final boolean isZipped;
 
-        private static final FilteredData emptyData = new FilteredData(null, null);
+        private static final FilteredData emptyData = new FilteredData(null, null, null);
 
         public boolean isEmpty() {
             return (key == null || key.length == 0) && (value == null || value.length == 0);
         }
 
-        public FilteredData(byte[] key, byte[] value) {
-            this(key, value, false);
+        public FilteredData(String type, byte[] key, byte[] value) {
+            this(type, key, value, false);
         }
 
-        public FilteredData(byte[] key, byte[] value, boolean isZipped) {
+        public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
             this.key = key;
             this.value = value;
             this.isZipped = isZipped;
+            this.infoTypeId = type;
         }
 
         public String getValueAString() {
@@ -70,9 +72,9 @@ public interface Filter {
         public Iterable<Header> headers() {
             ArrayList<Header> result = new ArrayList<>();
             if (isZipped()) {
-                Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null);
-                result.add(h);
+                result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
             }
+            result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
             return result;
         }
     }
index 8d51727..358c093 100644 (file)
@@ -23,6 +23,7 @@ package org.oran.dmaapadapter.filter;
 import com.google.gson.GsonBuilder;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +52,14 @@ public class FilterFactory {
         }
     }
 
+    public static PmReportFilter createAggregateFilter(Collection<PmReportFilter> filters) {
+        PmReportFilter.FilterData resultFilterData = filters.iterator().next().getFilterData();
+        for (PmReportFilter filter : filters) {
+            resultFilterData.addAll(filter.getFilterData());
+        }
+        return new PmReportFilter(resultFilterData);
+    }
+
     private static PmReportFilter.FilterData createPmFilterData(Object filter) {
         String str = gson.toJson(filter);
         return gson.fromJson(str, PmReportFilter.FilterData.class);
index 577f83d..1720517 100644 (file)
@@ -49,7 +49,7 @@ class JsltFilter implements Filter {
     @Override
     public FilteredData filter(DataFromTopic data) {
         if (expression == null) {
-            return new FilteredData(data.key, data.value);
+            return new FilteredData(data.infoTypeId, data.key, data.value);
         }
         try {
             JsonFactory factory = mapper.getFactory();
@@ -60,7 +60,7 @@ class JsltFilter implements Filter {
             if (filteredNode == NullNode.instance) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode));
+            return new FilteredData(data.infoTypeId, data.key, mapper.writeValueAsBytes(filteredNode));
         } catch (Exception e) {
             return FilteredData.empty();
         }
index 98f1b46..8c397b0 100644 (file)
@@ -46,7 +46,7 @@ class JsonPathFilter implements Filter {
             String str = new String(data.value);
             Object o = JsonPath.parse(str).read(this.expression, Object.class);
             String json = gson.toJson(o);
-            return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes());
+            return o == null ? FilteredData.empty() : new FilteredData(data.infoTypeId, data.key, json.getBytes());
         } catch (Exception e) {
             return FilteredData.empty();
         }
index 46c90a1..3182d7b 100644 (file)
@@ -42,9 +42,6 @@ public class PmReport {
         @Expose
         private String eventId;
 
-        @Expose
-        private int sequence;
-
         @Expose
         private String eventName;
 
@@ -55,23 +52,20 @@ public class PmReport {
         @Expose
         private String reportingEntityName;
 
-        @Expose
-        private String priority;
-
         @Expose
         private long startEpochMicrosec;
 
         @Expose
         private long lastEpochMicrosec;
 
-        @Expose
-        private String version;
-
-        @Expose
-        private String vesEventListenerVersion;
-
         @Expose
         private String timeZoneOffset;
+
+        /* Not reported elements */
+        int sequence;
+        String priority;
+        String version;
+        String vesEventListenerVersion;
     }
 
     public static class MeasInfoId {
index a545e90..04834c4 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -61,17 +62,35 @@ public class PmReportFilter implements Filter {
 
     @Getter
     public static class FilterData {
-        final Collection<String> sourceNames = new HashSet<>();
-        final Collection<String> measObjInstIds = new ArrayList<>();
-        final Collection<String> measTypes = new HashSet<>();
-        final Collection<String> measuredEntityDns = new ArrayList<>();
-        final Collection<String> measObjClass = new HashSet<>();
+        final Set<String> sourceNames = new HashSet<>();
+        final Set<String> measObjInstIds = new HashSet<>();
+        final Set<String> measTypes = new HashSet<>();
+        final Set<String> measuredEntityDns = new HashSet<>();
+        final Set<String> measObjClass = new HashSet<>();
 
         @Setter
         String pmRopStartTime;
 
         @Setter
         String pmRopEndTime;
+
+        public void addAll(FilterData other) {
+            addAll(other.sourceNames, sourceNames);
+            addAll(other.measObjInstIds, measObjInstIds);
+            addAll(other.measTypes, measTypes);
+            addAll(other.measuredEntityDns, measuredEntityDns);
+            addAll(other.measObjClass, measObjClass);
+        }
+
+        private void addAll(Set<String> source, Set<String> dst) {
+            if (source.isEmpty()) {
+                dst.clear();
+            } else if (dst.isEmpty()) {
+                // Nothing, this means 'match all'
+            } else {
+                dst.addAll(source);
+            }
+        }
     }
 
     private static class MeasTypesIndexed extends PmReport.MeasTypes {
@@ -108,7 +127,7 @@ public class PmReportFilter implements Filter {
             if (reportFiltered == null) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.key, gson.toJson(reportFiltered).getBytes());
+            return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes());
         } catch (Exception e) {
             logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
             return FilteredData.empty();
index 4806604..682f15e 100644 (file)
@@ -42,12 +42,12 @@ class RegexpFilter implements Filter {
     @Override
     public FilteredData filter(DataFromTopic data) {
         if (regexp == null) {
-            return new FilteredData(data.key, data.value);
+            return new FilteredData(data.infoTypeId, data.key, data.value);
         }
         Matcher matcher = regexp.matcher(data.valueAsString());
         boolean match = matcher.find();
         if (match) {
-            return new FilteredData(data.key, data.value);
+            return new FilteredData(data.infoTypeId, data.key, data.value);
         } else {
             return FilteredData.empty();
         }
index 8603507..8695524 100644 (file)
@@ -207,7 +207,7 @@ public class Job {
     public Filter.FilteredData filter(DataFromTopic data) {
         if (filter == null) {
             logger.debug("No filter used");
-            return new Filter.FilteredData(data.key, data.value);
+            return new Filter.FilteredData(data.infoTypeId, data.key, data.value);
         }
         return filter.filter(data);
     }
index 2c6b329..f236bbe 100644 (file)
@@ -29,12 +29,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job.Parameters;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,15 +50,141 @@ import org.springframework.stereotype.Component;
 @Component
 public class Jobs {
     public interface Observer {
-        void onJobbAdded(Job job);
+        void onJobbGroupAdded(JobGroup jobGroup);
+
+        void onJobGroupRemoved(JobGroup jobGroup);
+    }
+
+    public interface JobGroup {
+        public String getId();
+
+        public InfoType getType();
+
+        public void remove(Job job);
+
+        public boolean isEmpty();
+
+        public Filter.FilteredData filter(DataFromTopic data);
+
+        public Iterable<Job> getJobs();
+
+        public String getTopic();
+    }
+
+    public static class JobGroupSingle implements JobGroup {
+        @Getter
+        private final Job job;
+        private boolean isJobRemoved = false;
+
+        public JobGroupSingle(Job job) {
+            this.job = job;
+        }
+
+        @Override
+        public Filter.FilteredData filter(DataFromTopic data) {
+            return job.filter(data);
+        }
+
+        @Override
+        public void remove(Job job) {
+            this.isJobRemoved = true;
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return isJobRemoved;
+        }
+
+        @Override
+        public String getId() {
+            return job.getId();
+        }
+
+        @Override
+        public InfoType getType() {
+            return job.getType();
+        }
+
+        @Override
+        public Iterable<Job> getJobs() {
+            Collection<Job> c = new ArrayList<>();
+            c.add(job);
+            return c;
+        }
+
+        @Override
+        public String getTopic() {
+            return this.job.getParameters().getKafkaOutputTopic();
+        }
+    }
+
+    public static class JobGroupPm implements JobGroup {
+        @Getter
+        private final String topic;
 
-        void onJobRemoved(Job job);
+        private Map<String, Job> jobs = new HashMap<>();
+
+        @Getter
+        private PmReportFilter filter;
+
+        @Getter
+        private final InfoType type;
+
+        public JobGroupPm(InfoType type, String topic) {
+            this.topic = topic;
+            this.type = type;
+        }
+
+        public synchronized void add(Job job) {
+            this.jobs.put(job.getId(), job);
+            this.filter = createFilter();
+        }
+
+        public synchronized void remove(Job job) {
+            this.jobs.remove(job.getId());
+            if (!this.jobs.isEmpty()) {
+                this.filter = createFilter();
+            }
+        }
+
+        public boolean isEmpty() {
+            return jobs.isEmpty();
+        }
+
+        @Override
+        public Filter.FilteredData filter(DataFromTopic data) {
+            return filter.filter(data);
+        }
+
+        public Job getAJob() {
+            if (this.jobs.isEmpty()) {
+                return null;
+            }
+            return this.jobs.values().iterator().next();
+        }
+
+        private PmReportFilter createFilter() {
+            Collection<PmReportFilter> filterData = new ArrayList<>();
+            this.jobs.forEach((key, value) -> filterData.add((PmReportFilter) value.getFilter()));
+            return FilterFactory.createAggregateFilter(filterData);
+        }
+
+        @Override
+        public String getId() {
+            return topic;
+        }
+
+        @Override
+        public Iterable<Job> getJobs() {
+            return this.jobs.values();
+        }
     }
 
     private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
+    private Map<String, JobGroup> jobGroups = new HashMap<>();
     private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
     private final ApplicationConfig appConfig;
@@ -86,9 +218,6 @@ public class Jobs {
                 : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
         Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
         this.put(job);
-        synchronized (observers) {
-            this.observers.forEach(obs -> obs.onJobbAdded(job));
-        }
     }
 
     public void addObserver(Observer obs) {
@@ -97,10 +226,40 @@ public class Jobs {
         }
     }
 
+    private String jobGroupId(Job job) {
+        if (Strings.isNullOrEmpty(job.getParameters().getKafkaOutputTopic())) {
+            return job.getId();
+        } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) {
+            return job.getParameters().getKafkaOutputTopic();
+        } else {
+            return job.getId();
+        }
+    }
+
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
+        remove(job.getId());
+
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
+
+        if (job.getParameters().getFilterType() == Filter.Type.PM_DATA
+                && job.getParameters().getKafkaOutputTopic() != null) {
+            String topic = job.getParameters().getKafkaOutputTopic();
+            if (!this.jobGroups.containsKey(topic)) {
+                final JobGroupPm group = new JobGroupPm(job.getType(), topic);
+                this.jobGroups.put(topic, group);
+                group.add(job);
+                this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+            } else {
+                JobGroupPm group = (JobGroupPm) this.jobGroups.get(topic);
+                group.add(job);
+            }
+        } else {
+            JobGroupSingle group = new JobGroupSingle(job);
+            this.jobGroups.put(job.getId(), group);
+            this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+        }
     }
 
     public synchronized Iterable<Job> getAll() {
@@ -116,15 +275,20 @@ public class Jobs {
     }
 
     public void remove(Job job) {
+        String groupId = this.jobGroupId(job);
+        JobGroup group = this.jobGroups.get(groupId);
         synchronized (this) {
             this.allJobs.remove(job.getId());
             jobsByType.remove(job.getType().getId(), job.getId());
+            group.remove(job);
+            if (group.isEmpty()) {
+                this.jobGroups.remove(groupId);
+            }
         }
-        notifyJobRemoved(job);
-    }
 
-    private synchronized void notifyJobRemoved(Job job) {
-        this.observers.forEach(obs -> obs.onJobRemoved(job));
+        if (group.isEmpty()) {
+            this.observers.forEach(obs -> obs.onJobGroupRemoved(group));
+        }
     }
 
     public synchronized int size() {
@@ -137,7 +301,7 @@ public class Jobs {
 
     public void clear() {
 
-        this.allJobs.forEach((id, job) -> notifyJobRemoved(job));
+        this.jobGroups.forEach((id, group) -> this.observers.forEach(obs -> obs.onJobGroupRemoved(group)));
 
         synchronized (this) {
             allJobs.clear();
index 719597a..a9123e6 100644 (file)
@@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener {
                 .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
                 .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
-                .map(input -> new DataFromTopic(null, input.getBytes(), false))
+                .map(input -> new DataFromTopic(this.type.getId(), null, null, input.getBytes()))
                 .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
                 .publish() //
                 .autoConnect();
index 71918db..6193af6 100644 (file)
@@ -23,6 +23,8 @@ package org.oran.dmaapadapter.tasks;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
@@ -36,14 +38,15 @@ import reactor.core.publisher.Mono;
 public class HttpJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+    public HttpJobDataDistributor(JobGroup job, ApplicationConfig config) {
         super(job, config);
     }
 
     @Override
     protected Mono<String> sendToClient(Filter.FilteredData output) {
-        Job job = this.getJob();
-        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
+        JobGroupSingle group = (JobGroupSingle) this.getJobGroup();
+        Job job = group.getJob();
+        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output.getValueAString());
         MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
         return job.getConsumerRestClient().post("", output.getValueAString(), contentType);
     }
index f9353f1..ef493a5 100644 (file)
@@ -34,6 +34,9 @@ import org.oran.dmaapadapter.datastore.DataStore;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupPm;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +53,7 @@ public abstract class JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
 
     @Getter
-    private final Job job;
+    private final JobGroup jobGroup;
     private Disposable subscription;
     private final ErrorStats errorStats = new ErrorStats();
 
@@ -71,50 +74,48 @@ public abstract class JobDataDistributor {
         }
     }
 
-    protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+    protected JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) {
         this.applConfig = applConfig;
-        this.job = job;
+        this.jobGroup = jobGroup;
         this.dataStore = DataStore.create(applConfig);
         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
         this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
     }
 
     public void start(Flux<TopicListener.DataFromTopic> input) {
-        logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(),
-                job.getParameters().getKafkaOutputTopic());
-        PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
-
+        logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
+        PmReportFilter filter = getPmReportFilter(this.jobGroup);
         if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
-            this.subscription = filterAndBuffer(input, this.job) //
+            this.subscription = filterAndBuffer(input, this.jobGroup) //
                     .flatMap(this::sendToClient) //
                     .onErrorResume(this::handleError) //
                     .subscribe(this::handleSentOk, //
                             this::handleExceptionInStream, //
-                            () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId()));
+                            () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));
         }
 
         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
             this.dataStore.createLock(collectHistoricalDataLockName()) //
                     .doOnNext(isLockGranted -> {
                         if (isLockGranted.booleanValue()) {
-                            logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId());
+                            logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());
                         } else {
                             logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
-                                    this.job.getId());
+                                    this.jobGroup.getId());
                         }
                     }) //
                     .filter(isLockGranted -> isLockGranted) //
                     .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
                     .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
-                            this.job.getId())) //
+                            this.jobGroup.getId())) //
                     .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
                     .filter(this::isRopFile) //
                     .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
                     .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
                     .map(this::createFakeEvent) //
-                    .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
+                    .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),
                             dataStore), 100)
-                    .map(job::filter) //
+                    .map(jobGroup::filter) //
                     .map(this::gzip) //
                     .flatMap(this::sendToClient, 1) //
                     .onErrorResume(this::handleCollectHistoricalDataError) //
@@ -122,6 +123,17 @@ public abstract class JobDataDistributor {
         }
     }
 
+    private static PmReportFilter getPmReportFilter(JobGroup jobGroup) {
+
+        if (jobGroup instanceof JobGroupPm) {
+            return ((JobGroupPm) jobGroup).getFilter();
+        } else if (jobGroup instanceof JobGroupSingle) {
+            Filter f = ((JobGroupSingle) jobGroup).getJob().getFilter();
+            return (f instanceof PmReportFilter) ? (PmReportFilter) f : null;
+        }
+        return null;
+    }
+
     private Filter.FilteredData gzip(Filter.FilteredData data) {
         if (this.applConfig.isZipOutput()) {
             try {
@@ -131,7 +143,7 @@ public abstract class JobDataDistributor {
                 gzip.flush();
                 gzip.close();
                 byte[] zipped = out.toByteArray();
-                return new Filter.FilteredData(data.key, zipped, true);
+                return new Filter.FilteredData(data.infoTypeId, data.key, zipped, true);
             } catch (IOException e) {
                 logger.error("Unexpected exception when zipping: {}", e.getMessage());
                 return data;
@@ -142,30 +154,28 @@ public abstract class JobDataDistributor {
     }
 
     private Mono<String> handleCollectHistoricalDataError(Throwable t) {
-        logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
+        logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId());
         return tryDeleteLockFile() //
                 .map(bool -> "OK");
     }
 
     private String collectHistoricalDataLockName() {
-        return "collectHistoricalDataLock" + this.job.getId();
+        return "collectHistoricalDataLock" + this.jobGroup.getId();
     }
 
     private TopicListener.DataFromTopic createFakeEvent(String fileName) {
         NewFileEvent ev = new NewFileEvent(fileName);
-        return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
+        return new TopicListener.DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes());
     }
 
     private static String fileTimePartFromRopFileName(String fileName) {
+        // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"
         return fileName.substring(fileName.lastIndexOf("/") + 2);
     }
 
     private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
-        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
         try {
-            String fileTimePart = fileTimePartFromRopFileName(fileName);
-            fileTimePart = fileTimePart.substring(0, 18);
-            OffsetDateTime fileStartTime = parseFileDate(fileTimePart);
+            OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName);
             OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
             boolean isMatch = fileStartTime.isAfter(startTime);
             logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
@@ -182,14 +192,11 @@ public abstract class JobDataDistributor {
     }
 
     private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
-        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
         if (filter.getPmRopEndTime() == null) {
             return true;
         }
         try {
-            String fileTimePart = fileTimePartFromRopFileName(fileName);
-            fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
-            OffsetDateTime fileEndTime = parseFileDate(fileTimePart);
+            OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName);
             OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
             boolean isMatch = fileEndTime.isBefore(endTime);
             logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
@@ -202,6 +209,20 @@ public abstract class JobDataDistributor {
         }
     }
 
+    private static OffsetDateTime getStartTimeFromFileName(String fileName) {
+        String fileTimePart = fileTimePartFromRopFileName(fileName);
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+        fileTimePart = fileTimePart.substring(0, 18);
+        return parseFileDate(fileTimePart);
+    }
+
+    private static OffsetDateTime getEndTimeFromFileName(String fileName) {
+        String fileTimePart = fileTimePartFromRopFileName(fileName);
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+        fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
+        return parseFileDate(fileTimePart);
+    }
+
     private static OffsetDateTime parseFileDate(String timeStr) {
         DateTimeFormatter startTimeFormatter =
                 new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
@@ -209,14 +230,14 @@ public abstract class JobDataDistributor {
     }
 
     private void handleExceptionInStream(Throwable t) {
-        logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
+        logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId());
     }
 
     protected abstract Mono<String> sendToClient(Filter.FilteredData output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
-            logger.debug("Stopped, job: {}", job.getId());
+            logger.debug("Stopped, job: {}", jobGroup.getId());
             this.subscription.dispose();
             this.subscription = null;
         }
@@ -233,23 +254,26 @@ public abstract class JobDataDistributor {
         return this.subscription != null;
     }
 
-    private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+    private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, JobGroup jobGroup) {
         Flux<Filter.FilteredData> filtered = //
                 inputFlux //
-                        .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) //
-                        .doOnNext(data -> job.getStatistics().received(data.value)) //
-                        .map(job::filter) //
+                        .doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) //
+                        .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) //
+                        .map(jobGroup::filter) //
                         .map(this::gzip) //
                         .filter(f -> !f.isEmpty()) //
-                        .doOnNext(f -> job.getStatistics().filtered(f.value)) //
-                        .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); //
+                        .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) //
+                        .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) //
+        ; //
 
+        Job job = jobGroup.getJobs().iterator().next();
         if (job.isBuffered()) {
             filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes()));
+                    .map(buffered -> new Filter.FilteredData(this.getJobGroup().getType().getId(), null,
+                            buffered.toString().getBytes()));
         }
         return filtered;
     }
@@ -264,7 +288,7 @@ public abstract class JobDataDistributor {
     }
 
     private Mono<String> handleError(Throwable t) {
-        logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+        logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId());
         this.errorStats.handleException(t);
         return Mono.empty(); // Ignore
     }
index 6b33c3b..c0b1d16 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
-import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +48,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
 
     private KafkaSender<byte[], byte[]> sender;
 
-    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
-        super(job, appConfig);
+    public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) {
+        super(jobGroup, appConfig);
 
         SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
         this.sender = KafkaSender.create(senderOptions);
@@ -57,16 +57,16 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
 
     @Override
     protected Mono<String> sendToClient(Filter.FilteredData data) {
-        Job job = this.getJob();
-        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, job);
+
+        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getTopic());
 
         logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
-                job.getParameters().getKafkaOutputTopic());
+                this.getJobGroup().getTopic());
 
         return this.sender.send(Mono.just(senderRecord)) //
-                .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) //
-                .doOnError(
-                        t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) //
+                .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getTopic())) //
+                .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
+                        t.getMessage())) //
                 .onErrorResume(t -> Mono.empty()) //
                 .collectList() //
                 .map(x -> "ok");
@@ -93,9 +93,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
+    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, String topic) {
         int correlationMetadata = 2;
-        String topic = infoJob.getParameters().getKafkaOutputTopic();
         var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
         return SenderRecord.create(producerRecord, correlationMetadata);
     }
index 1a20c68..021678d 100644 (file)
@@ -85,7 +85,7 @@ public class KafkaTopicListener implements TopicListener {
                 .doFinally(
                         sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
                 .filter(t -> t.value().length > 0 || t.key().length > 0) //
-                .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
+                .map(input -> new DataFromTopic(this.type.getId(), input.headers(), input.key(), input.value())) //
                 .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) //
                 .publish() //
                 .autoConnect(1);
@@ -126,7 +126,7 @@ public class KafkaTopicListener implements TopicListener {
             logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic());
             return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
                     .map(bytes -> unzip(bytes, ev.getFilename())) //
-                    .map(bytes -> new DataFromTopic(data.key, bytes, false));
+                    .map(bytes -> new DataFromTopic(data.infoTypeId, data.headers, data.key, bytes));
 
         } catch (Exception e) {
             return Mono.just(data);
index 8b1afc8..debc783 100644 (file)
@@ -34,7 +34,10 @@ public interface TopicListener {
     public static class DataFromTopic {
         public final byte[] key;
         public final byte[] value;
-        public final boolean isZipped;
+
+        public final String infoTypeId;
+
+        public final Iterable<Header> headers;
 
         private static byte[] noBytes = new byte[0];
 
@@ -43,29 +46,43 @@ public interface TopicListener {
         @ToString.Exclude
         private PmReport cachedPmReport;
 
-        public DataFromTopic(byte[] key, byte[] value, boolean isZipped) {
+        public DataFromTopic(String typeId, Iterable<Header> headers, byte[] key, byte[] value) {
             this.key = key == null ? noBytes : key;
             this.value = value == null ? noBytes : value;
-            this.isZipped = isZipped;
+            this.infoTypeId = typeId;
+            this.headers = headers;
         }
 
         public String valueAsString() {
             return new String(this.value);
         }
 
-        public static final String ZIP_PROPERTY = "gzip";
+        public static final String ZIPPED_PROPERTY = "gzip";
+        public static final String TYPE_ID_PROPERTY = "type-id";
 
-        public static boolean findZipped(Iterable<Header> headers) {
+        public boolean isZipped() {
             if (headers == null) {
                 return false;
             }
             for (Header h : headers) {
-                if (h.key().equals(ZIP_PROPERTY)) {
+                if (h.key().equals(ZIPPED_PROPERTY)) {
                     return true;
                 }
             }
             return false;
         }
+
+        public String getTypeIdFromHeaders() {
+            if (headers == null) {
+                return "";
+            }
+            for (Header h : headers) {
+                if (h.key().equals(TYPE_ID_PROPERTY)) {
+                    return new String(h.value());
+                }
+            }
+            return "";
+        }
     }
 
     public Flux<DataFromTopic> getFlux();
index a3e3703..56180d7 100644 (file)
@@ -30,8 +30,8 @@ import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
 import org.oran.dmaapadapter.repository.MultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,52 +70,52 @@ public class TopicListeners {
 
         jobs.addObserver(new Jobs.Observer() {
             @Override
-            public void onJobbAdded(Job job) {
-                addJob(job);
+            public void onJobbGroupAdded(JobGroup jobGroup) {
+                addJob(jobGroup);
             }
 
             @Override
-            public void onJobRemoved(Job job) {
-                removeJob(job);
+            public void onJobGroupRemoved(JobGroup jobGroup) {
+                removeDistributor(jobGroup);
             }
         });
     }
 
-    public synchronized void addJob(Job job) {
-        removeJob(job);
-        logger.debug("Job added {}", job.getId());
-        if (job.getType().isKafkaTopicDefined()) {
-            addConsumer(job, dataDistributors, kafkaTopicListeners);
+    public synchronized void addJob(JobGroup jobGroup) {
+        removeDistributor(jobGroup);
+        logger.debug("Job added {}", jobGroup.getId());
+        if (jobGroup.getType().isKafkaTopicDefined()) {
+            addDistributor(jobGroup, dataDistributors, kafkaTopicListeners);
         }
 
-        if (job.getType().isDmaapTopicDefined()) {
-            addConsumer(job, dataDistributors, dmaapTopicListeners);
+        if (jobGroup.getType().isDmaapTopicDefined()) {
+            addDistributor(jobGroup, dataDistributors, dmaapTopicListeners);
         }
     }
 
-    private JobDataDistributor createConsumer(Job job) {
-        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
-                : new HttpJobDataDistributor(job, appConfig);
+    private JobDataDistributor createDistributor(JobGroup jobGroup) {
+        return !Strings.isEmpty(jobGroup.getTopic()) ? new KafkaJobDataDistributor(jobGroup, appConfig)
+                : new HttpJobDataDistributor(jobGroup, appConfig);
     }
 
-    private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+    private void addDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors,
             Map<String, TopicListener> topicListeners) {
-        TopicListener topicListener = topicListeners.get(job.getType().getId());
-        JobDataDistributor distributor = createConsumer(job);
+        TopicListener topicListener = topicListeners.get(jobGroup.getType().getId());
+        JobDataDistributor distributor = createDistributor(jobGroup);
 
         distributor.start(topicListener.getFlux());
 
-        distributors.put(job.getType().getId(), job.getId(), distributor);
+        distributors.put(jobGroup.getType().getId(), jobGroup.getId(), distributor);
     }
 
-    public synchronized void removeJob(Job job) {
-        removeJob(job, dataDistributors);
+    public synchronized void removeDistributor(JobGroup jobGroup) {
+        removeDistributor(jobGroup, dataDistributors);
     }
 
-    private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
-        JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+    private static void removeDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors) {
+        JobDataDistributor distributor = distributors.remove(jobGroup.getType().getId(), jobGroup.getId());
         if (distributor != null) {
-            logger.debug("Job removed {}", job.getId());
+            logger.debug("Job removed {}", jobGroup.getId());
             distributor.stop();
         }
     }
index a62ae7d..f1b0f21 100644 (file)
@@ -54,9 +54,9 @@ public class ConsumerController {
 
     public static class TestResults {
 
-        public List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
+        public final List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
 
-        public List<Map<String, String>> receivedHeaders =
+        public final List<Map<String, String>> receivedHeaders =
                 Collections.synchronizedList(new ArrayList<Map<String, String>>());
 
         public TestResults() {}
index 7e806e2..7317919 100644 (file)
@@ -203,16 +203,17 @@ class IntegrationWithKafka {
         }
 
         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
-            if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) {
-                logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped,
+            if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
+                logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
                         this.applicationConfig.isZipOutput());
             }
-            if (!receivedKafkaOutput.isZipped) {
+
+            if (!receivedKafkaOutput.isZipped()) {
                 return receivedKafkaOutput;
             }
             try {
                 byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value);
-                return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false);
+                return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
             } catch (IOException e) {
                 logger.error("********* ERROR ", e.getMessage());
                 return null;
@@ -222,7 +223,10 @@ class IntegrationWithKafka {
         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
             this.count++;
-            logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+            if (logger.isDebugEnabled()) {
+                logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+                logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+            }
         }
 
         synchronized String lastKey() {
@@ -234,7 +238,7 @@ class IntegrationWithKafka {
         }
 
         void reset() {
-            this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false);
+            this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
             this.count = 0;
         }
     }
@@ -494,9 +498,11 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
         this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
+
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
         waitForKafkaListener();
 
         var dataToSend = Flux.range(1, 3).map(i -> kafkaSenderRecord("Message_" + i, "", KAFKA_TYPE_ID)); // Message_1,
@@ -622,7 +628,7 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        final int NO_OF_COUNTERS = 5;
+        final int NO_OF_COUNTERS = 2;
         for (int i = 0; i < NO_OF_COUNTERS; ++i) {
             filterData.getMeasTypes().add("pmCounterNumber" + i);
         }
@@ -678,34 +684,32 @@ class IntegrationWithKafka {
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
-    void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception {
+    void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
         // Filter PM reports and sent to many jobs over Kafka
 
-        this.applicationConfig.setZipOutput(true);
+        this.applicationConfig.setZipOutput(false);
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        final int NO_OF_COUNTERS = 0;
-        for (int i = 0; i < NO_OF_COUNTERS; ++i) {
-            filterData.getMeasTypes().add("pmCounterNumber" + i);
-        }
-        filterData.getMeasObjClass().add("NRCellCU");
+        final int NO_OF_JOBS = 150;
+        ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+        for (int i = 0; i < NO_OF_JOBS; ++i) {
+            final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
+            final String jobId = "manyJobs_" + i;
+            PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+            filterData.getMeasTypes().add("pmCounterNumber" + i); // all counters will be added
+            filterData.getMeasObjClass().add("NRCellCU");
 
-        final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient());
+            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
 
-        final int NO_OF_RECEIVERS = 150;
-        ArrayList<KafkaReceiver> receivers = new ArrayList<>();
-        for (int i = 0; i < NO_OF_RECEIVERS; ++i) {
             KafkaReceiver receiver =
                     new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
             receivers.add(receiver);
         }
 
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
         final int NO_OF_OBJECTS = 1000;
index 6fa7ce8..7ad5a6c 100644 (file)
@@ -33,7 +33,7 @@ import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 class JsltFilterTest {
 
     private String filterReport(JsltFilter filter) throws Exception {
-        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+        DataFromTopic data = new DataFromTopic("type", null, null, loadReport().getBytes());
         FilteredData filtered = filter.filter(data);
         return filtered.getValueAString();
     }
index 6e75757..2f5b4b1 100644 (file)
@@ -36,7 +36,7 @@ class JsonPathFilterTest {
     void testJsonPath() throws Exception {
         String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
         JsonPathFilter filter = new JsonPathFilter(exp);
-        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+        DataFromTopic data = new DataFromTopic("typeId", null, null, loadReport().getBytes());
         FilteredData filtered = filter.filter(data);
         String res = filtered.getValueAString();
         assertThat(res).isEqualTo("\"attTCHSeizures\"");
index 2f1f3a6..bb6f145 100644 (file)
@@ -99,7 +99,8 @@ class PmReportFilterTest {
 
     private String filterReport(PmReportFilter filter) throws Exception {
 
-        TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+        TopicListener.DataFromTopic data =
+                new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
         FilteredData filtered = filter.filter(data);
 
         String reportAfterFilter = gson.toJson(data.getCachedPmReport());
@@ -156,7 +157,8 @@ class PmReportFilterTest {
         }
 
         {
-            TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+            TopicListener.DataFromTopic data =
+                    new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
 
             PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
             utranCellFilter.measObjClass.add("UtranCell");
@@ -213,7 +215,7 @@ class PmReportFilterTest {
             filterData.getMeasTypes().add("pmCounterNumber0");
             filterData.getMeasObjClass().add("NRCellCU");
             PmReportFilter filter = new PmReportFilter(filterData);
-            DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false);
+            DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes());
 
             Instant startTime = Instant.now();
             for (int i = 0; i < TIMES; ++i) {
@@ -266,10 +268,11 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         PmReportFilter filter = new PmReportFilter(filterData);
 
-        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false));
+        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic("typeId", null, null, "junk".getBytes()));
         assertThat(filtered.isEmpty()).isTrue();
 
-        filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false));
+        filtered = filter
+                .filter(new TopicListener.DataFromTopic("typeId", null, null, reQuote("{'msg': 'test'}").getBytes()));
         assertThat(filtered.isEmpty()).isTrue();
 
     }