From: PatrikBuhr Date: Thu, 8 Dec 2022 13:28:23 +0000 (+0100) Subject: DMaaP adapter, support for shared Kafka topics X-Git-Tag: 1.3.0~9^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F57%2F10057%2F3;p=nonrtric%2Fplt%2Fdmaapadapter.git DMaaP adapter, support for shared Kafka topics If several PM subscribers specify the same Kafka ouput topic, the ouput objects will contain the sum of all filtered output data. Added info type if to the kafka header Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-820 Change-Id: I33e6950f91fa0088658f463506a7c70670cc6e42 --- diff --git a/docs/Pictures.pptx b/docs/Pictures.pptx index e2cf47e..2ec5a51 100644 Binary files a/docs/Pictures.pptx and b/docs/Pictures.pptx differ diff --git a/docs/dedicatedTopics.png b/docs/dedicatedTopics.png new file mode 100644 index 0000000..0b91b75 Binary files /dev/null and b/docs/dedicatedTopics.png differ diff --git a/docs/overview.rst b/docs/overview.rst index 1174e44..cd80ce0 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -179,6 +179,8 @@ This schema will be registered when the configured dataType is "pmData". This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to define which meas-types (counters) to get from which resources. +"It is possible to both filter on new data that is collected from the traffical nodes and to query from data that is already collected. + The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering. * sourceNames an array of source names for wanted PM reports. @@ -188,9 +190,11 @@ The filterType parameter is extended to allow value "pmdata" which can be used f * measuredEntityDns partial match of meas entity DNs. * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8). Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction". +* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned. + In this case, the query is only done for files from the given "sourceNames". + If this parameter is excluded, only "new" reports will be delivered as they are collected from the traffical nodes. -All PM filter properties are optional and a non given will result in "match all". -The result of the filtering is still following the structure of a 3GPP PM report. +All PM filter properties are optional and a non given will result in "match all" (except the pmRopStartTime). Below follows an example of a PM filter. @@ -210,6 +214,134 @@ Below follows an example of a PM filter. ], "measuredEntityDns":[ "ManagedElement=RNC-Gbg-1" + ], + "measObjClass":[ + "UtranCell" + ] + "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00" + } + } + + +Here is an example of a filter that will +match two counters from all cells in two traffical nodes. + +.. code-block:: javascript + + { + "filterType":"pmdata", + "filter": { + "sourceNames":[ + "O-DU-1122", "O-DU-1123" + ], + "measTypes":[ + "succImmediateAssignProcs", "attTCHSeizures" + ], + "measObjClass":[ + "UtranCell" ] } } + + +******************** +Bulk PM subscription +******************** + +The sequence is that a "new file event" is received (from a Kafka topic). +The file is read from local storage (file storage or S3 object store). For each Job, the specified PM filter is applied to the data +and the result is sent to the Kafka topic specified by the Job (by the data consumer). + +.. image:: ./dedicatedTopics.png + :width: 500pt + +The result of the PM filtering is still following the structure of a 3GPP PM report. +Here follows an example of a resulting delivered PM report. + +.. code-block:: javascript + + { + "event":{ + "commonEventHeader":{ + "domain":"perf3gpp", + "eventId":"9efa1210-f285-455f-9c6a-3a659b1f1882", + "eventName":"perf3gpp_gnb-Ericsson_pmMeasResult", + "sourceName":"O-DU-1122", + "reportingEntityName":"", + "startEpochMicrosec":951912000000, + "lastEpochMicrosec":951912900000, + "timeZoneOffset":"+00:00" + }, + "perf3gppFields":{ + "perf3gppFieldsVersion":"1.0", + "measDataCollection":{ + "granularityPeriod":900, + "measuredEntityUserName":"RNC Telecomville", + "measuredEntityDn":"SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1", + "measuredEntitySoftwareVersion":"", + "measInfoList":[ + { + "measInfoId":{ + "sMeasInfoId":"" + }, + "measTypes":{ + "map":{ + "succImmediateAssignProcs":1 + }, + "sMeasTypesList":[ + "succImmediateAssignProcs" + ] + }, + "measValuesList":[ + { + "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-997", + "suspectFlag":"false", + "measResults":[ + { + "p":1, + "sValue":"1113" + } + ] + }, + { + "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-998", + "suspectFlag":"false", + "measResults":[ + { + "p":1, + "sValue":"234" + } + ] + }, + { + "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-999", + "suspectFlag":"true", + "measResults":[ + { + "p":1, + "sValue":"789" + } + ] + } + ] + } + ] + } + } + } + } + +If several jobs publish to the same Kafka topic (shared topic), the resulting filtered output will be an aggregate of all matching filters. +So, each consumer will then get more data than requested. + +.. image:: ./sharedTopics.png + :width: 500pt + +================== +Sent Kafka headers +================== + +For each filtered result sent to a Kafka topic, there will the following proerties in the Kafa header: + +* type-id, this propery is used to indicate the ID of the information type. The value is a string. +* gzip, if this property exists the object is gzipped (otherwise not). The property has no value. \ No newline at end of file diff --git a/docs/sharedTopics.png b/docs/sharedTopics.png new file mode 100644 index 0000000..55668c4 Binary files /dev/null and b/docs/sharedTopics.png differ diff --git a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index 1816dc3..c118da3 100644 --- a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -97,7 +97,7 @@ public class ProducerCallbacksController { logger.warn("jobCreatedCallback failed: {}", e.getMessage()); return ErrorResponse.create(e, e.getHttpStatus()); } catch (Exception e) { - logger.warn("jobCreatedCallback failed: {}", e.getMessage()); + logger.warn("jobCreatedCallback failed: {}", e.getMessage(), e); return ErrorResponse.create(e, HttpStatus.BAD_REQUEST); } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index 6c5fa25..e327fb6 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -39,24 +39,26 @@ public interface Filter { public static class FilteredData { public final byte[] key; public final byte[] value; + public final String infoTypeId; @Getter private final boolean isZipped; - private static final FilteredData emptyData = new FilteredData(null, null); + private static final FilteredData emptyData = new FilteredData(null, null, null); public boolean isEmpty() { return (key == null || key.length == 0) && (value == null || value.length == 0); } - public FilteredData(byte[] key, byte[] value) { - this(key, value, false); + public FilteredData(String type, byte[] key, byte[] value) { + this(type, key, value, false); } - public FilteredData(byte[] key, byte[] value, boolean isZipped) { + public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) { this.key = key; this.value = value; this.isZipped = isZipped; + this.infoTypeId = type; } public String getValueAString() { @@ -70,9 +72,9 @@ public interface Filter { public Iterable
headers() { ArrayList
result = new ArrayList<>(); if (isZipped()) { - Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null); - result.add(h); + result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null)); } + result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes())); return result; } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java b/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java index 8d51727..358c093 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java +++ b/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java @@ -23,6 +23,7 @@ package org.oran.dmaapadapter.filter; import com.google.gson.GsonBuilder; import java.lang.invoke.MethodHandles; +import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,14 @@ public class FilterFactory { } } + public static PmReportFilter createAggregateFilter(Collection filters) { + PmReportFilter.FilterData resultFilterData = filters.iterator().next().getFilterData(); + for (PmReportFilter filter : filters) { + resultFilterData.addAll(filter.getFilterData()); + } + return new PmReportFilter(resultFilterData); + } + private static PmReportFilter.FilterData createPmFilterData(Object filter) { String str = gson.toJson(filter); return gson.fromJson(str, PmReportFilter.FilterData.class); diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java index 577f83d..1720517 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java @@ -49,7 +49,7 @@ class JsltFilter implements Filter { @Override public FilteredData filter(DataFromTopic data) { if (expression == null) { - return new FilteredData(data.key, data.value); + return new FilteredData(data.infoTypeId, data.key, data.value); } try { JsonFactory factory = mapper.getFactory(); @@ -60,7 +60,7 @@ class JsltFilter implements Filter { if (filteredNode == NullNode.instance) { return FilteredData.empty(); } - return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode)); + return new FilteredData(data.infoTypeId, data.key, mapper.writeValueAsBytes(filteredNode)); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java index 98f1b46..8c397b0 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java @@ -46,7 +46,7 @@ class JsonPathFilter implements Filter { String str = new String(data.value); Object o = JsonPath.parse(str).read(this.expression, Object.class); String json = gson.toJson(o); - return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes()); + return o == null ? FilteredData.empty() : new FilteredData(data.infoTypeId, data.key, json.getBytes()); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java index 46c90a1..3182d7b 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java @@ -42,9 +42,6 @@ public class PmReport { @Expose private String eventId; - @Expose - private int sequence; - @Expose private String eventName; @@ -55,23 +52,20 @@ public class PmReport { @Expose private String reportingEntityName; - @Expose - private String priority; - @Expose private long startEpochMicrosec; @Expose private long lastEpochMicrosec; - @Expose - private String version; - - @Expose - private String vesEventListenerVersion; - @Expose private String timeZoneOffset; + + /* Not reported elements */ + int sequence; + String priority; + String version; + String vesEventListenerVersion; } public static class MeasInfoId { diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index a545e90..04834c4 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; import lombok.Getter; import lombok.Setter; @@ -61,17 +62,35 @@ public class PmReportFilter implements Filter { @Getter public static class FilterData { - final Collection sourceNames = new HashSet<>(); - final Collection measObjInstIds = new ArrayList<>(); - final Collection measTypes = new HashSet<>(); - final Collection measuredEntityDns = new ArrayList<>(); - final Collection measObjClass = new HashSet<>(); + final Set sourceNames = new HashSet<>(); + final Set measObjInstIds = new HashSet<>(); + final Set measTypes = new HashSet<>(); + final Set measuredEntityDns = new HashSet<>(); + final Set measObjClass = new HashSet<>(); @Setter String pmRopStartTime; @Setter String pmRopEndTime; + + public void addAll(FilterData other) { + addAll(other.sourceNames, sourceNames); + addAll(other.measObjInstIds, measObjInstIds); + addAll(other.measTypes, measTypes); + addAll(other.measuredEntityDns, measuredEntityDns); + addAll(other.measObjClass, measObjClass); + } + + private void addAll(Set source, Set dst) { + if (source.isEmpty()) { + dst.clear(); + } else if (dst.isEmpty()) { + // Nothing, this means 'match all' + } else { + dst.addAll(source); + } + } } private static class MeasTypesIndexed extends PmReport.MeasTypes { @@ -108,7 +127,7 @@ public class PmReportFilter implements Filter { if (reportFiltered == null) { return FilteredData.empty(); } - return new FilteredData(data.key, gson.toJson(reportFiltered).getBytes()); + return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes()); } catch (Exception e) { logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage()); return FilteredData.empty(); diff --git a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java index 4806604..682f15e 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java @@ -42,12 +42,12 @@ class RegexpFilter implements Filter { @Override public FilteredData filter(DataFromTopic data) { if (regexp == null) { - return new FilteredData(data.key, data.value); + return new FilteredData(data.infoTypeId, data.key, data.value); } Matcher matcher = regexp.matcher(data.valueAsString()); boolean match = matcher.find(); if (match) { - return new FilteredData(data.key, data.value); + return new FilteredData(data.infoTypeId, data.key, data.value); } else { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 8603507..8695524 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -207,7 +207,7 @@ public class Job { public Filter.FilteredData filter(DataFromTopic data) { if (filter == null) { logger.debug("No filter used"); - return new Filter.FilteredData(data.key, data.value); + return new Filter.FilteredData(data.infoTypeId, data.key, data.value); } return filter.filter(data); } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 2c6b329..f236bbe 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -29,12 +29,18 @@ import java.util.List; import java.util.Map; import java.util.Vector; +import lombok.Getter; + import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.filter.Filter; +import org.oran.dmaapadapter.filter.FilterFactory; +import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job.Parameters; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -44,15 +50,141 @@ import org.springframework.stereotype.Component; @Component public class Jobs { public interface Observer { - void onJobbAdded(Job job); + void onJobbGroupAdded(JobGroup jobGroup); + + void onJobGroupRemoved(JobGroup jobGroup); + } + + public interface JobGroup { + public String getId(); + + public InfoType getType(); + + public void remove(Job job); + + public boolean isEmpty(); + + public Filter.FilteredData filter(DataFromTopic data); + + public Iterable getJobs(); + + public String getTopic(); + } + + public static class JobGroupSingle implements JobGroup { + @Getter + private final Job job; + private boolean isJobRemoved = false; + + public JobGroupSingle(Job job) { + this.job = job; + } + + @Override + public Filter.FilteredData filter(DataFromTopic data) { + return job.filter(data); + } + + @Override + public void remove(Job job) { + this.isJobRemoved = true; + } + + @Override + public boolean isEmpty() { + return isJobRemoved; + } + + @Override + public String getId() { + return job.getId(); + } + + @Override + public InfoType getType() { + return job.getType(); + } + + @Override + public Iterable getJobs() { + Collection c = new ArrayList<>(); + c.add(job); + return c; + } + + @Override + public String getTopic() { + return this.job.getParameters().getKafkaOutputTopic(); + } + } + + public static class JobGroupPm implements JobGroup { + @Getter + private final String topic; - void onJobRemoved(Job job); + private Map jobs = new HashMap<>(); + + @Getter + private PmReportFilter filter; + + @Getter + private final InfoType type; + + public JobGroupPm(InfoType type, String topic) { + this.topic = topic; + this.type = type; + } + + public synchronized void add(Job job) { + this.jobs.put(job.getId(), job); + this.filter = createFilter(); + } + + public synchronized void remove(Job job) { + this.jobs.remove(job.getId()); + if (!this.jobs.isEmpty()) { + this.filter = createFilter(); + } + } + + public boolean isEmpty() { + return jobs.isEmpty(); + } + + @Override + public Filter.FilteredData filter(DataFromTopic data) { + return filter.filter(data); + } + + public Job getAJob() { + if (this.jobs.isEmpty()) { + return null; + } + return this.jobs.values().iterator().next(); + } + + private PmReportFilter createFilter() { + Collection filterData = new ArrayList<>(); + this.jobs.forEach((key, value) -> filterData.add((PmReportFilter) value.getFilter())); + return FilterFactory.createAggregateFilter(filterData); + } + + @Override + public String getId() { + return topic; + } + + @Override + public Iterable getJobs() { + return this.jobs.values(); + } } private static final Logger logger = LoggerFactory.getLogger(Jobs.class); private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); + private Map jobGroups = new HashMap<>(); private final AsyncRestClientFactory restclientFactory; private final List observers = new ArrayList<>(); private final ApplicationConfig appConfig; @@ -86,9 +218,6 @@ public class Jobs { : restclientFactory.createRestClientNoHttpProxy(callbackUrl); Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig); this.put(job); - synchronized (observers) { - this.observers.forEach(obs -> obs.onJobbAdded(job)); - } } public void addObserver(Observer obs) { @@ -97,10 +226,40 @@ public class Jobs { } } + private String jobGroupId(Job job) { + if (Strings.isNullOrEmpty(job.getParameters().getKafkaOutputTopic())) { + return job.getId(); + } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) { + return job.getParameters().getKafkaOutputTopic(); + } else { + return job.getId(); + } + } + private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); + remove(job.getId()); + allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); + + if (job.getParameters().getFilterType() == Filter.Type.PM_DATA + && job.getParameters().getKafkaOutputTopic() != null) { + String topic = job.getParameters().getKafkaOutputTopic(); + if (!this.jobGroups.containsKey(topic)) { + final JobGroupPm group = new JobGroupPm(job.getType(), topic); + this.jobGroups.put(topic, group); + group.add(job); + this.observers.forEach(obs -> obs.onJobbGroupAdded(group)); + } else { + JobGroupPm group = (JobGroupPm) this.jobGroups.get(topic); + group.add(job); + } + } else { + JobGroupSingle group = new JobGroupSingle(job); + this.jobGroups.put(job.getId(), group); + this.observers.forEach(obs -> obs.onJobbGroupAdded(group)); + } } public synchronized Iterable getAll() { @@ -116,15 +275,20 @@ public class Jobs { } public void remove(Job job) { + String groupId = this.jobGroupId(job); + JobGroup group = this.jobGroups.get(groupId); synchronized (this) { this.allJobs.remove(job.getId()); jobsByType.remove(job.getType().getId(), job.getId()); + group.remove(job); + if (group.isEmpty()) { + this.jobGroups.remove(groupId); + } } - notifyJobRemoved(job); - } - private synchronized void notifyJobRemoved(Job job) { - this.observers.forEach(obs -> obs.onJobRemoved(job)); + if (group.isEmpty()) { + this.observers.forEach(obs -> obs.onJobGroupRemoved(group)); + } } public synchronized int size() { @@ -137,7 +301,7 @@ public class Jobs { public void clear() { - this.allJobs.forEach((id, job) -> notifyJobRemoved(job)); + this.jobGroups.forEach((id, group) -> this.observers.forEach(obs -> obs.onJobGroupRemoved(group))); synchronized (this) { allJobs.clear(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 719597a..a9123e6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener { .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) // .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // - .map(input -> new DataFromTopic(null, input.getBytes(), false)) + .map(input -> new DataFromTopic(this.type.getId(), null, null, input.getBytes())) .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) .publish() // .autoConnect(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index 71918db..6193af6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -23,6 +23,8 @@ package org.oran.dmaapadapter.tasks; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; +import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; @@ -36,14 +38,15 @@ import reactor.core.publisher.Mono; public class HttpJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpJobDataDistributor(Job job, ApplicationConfig config) { + public HttpJobDataDistributor(JobGroup job, ApplicationConfig config) { super(job, config); } @Override protected Mono sendToClient(Filter.FilteredData output) { - Job job = this.getJob(); - logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); + JobGroupSingle group = (JobGroupSingle) this.getJobGroup(); + Job job = group.getJob(); + logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output.getValueAString()); MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; return job.getConsumerRestClient().post("", output.getValueAString(), contentType); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index f9353f1..ef493a5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -34,6 +34,9 @@ import org.oran.dmaapadapter.datastore.DataStore; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; +import org.oran.dmaapadapter.repository.Jobs.JobGroupPm; +import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +53,7 @@ public abstract class JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class); @Getter - private final Job job; + private final JobGroup jobGroup; private Disposable subscription; private final ErrorStats errorStats = new ErrorStats(); @@ -71,50 +74,48 @@ public abstract class JobDataDistributor { } } - protected JobDataDistributor(Job job, ApplicationConfig applConfig) { + protected JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) { this.applConfig = applConfig; - this.job = job; + this.jobGroup = jobGroup; this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); this.dataStore.create(DataStore.Bucket.LOCKS).subscribe(); } public void start(Flux input) { - logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(), - job.getParameters().getKafkaOutputTopic()); - PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; - + logger.debug("Starting distribution, to topic: {}", jobGroup.getId()); + PmReportFilter filter = getPmReportFilter(this.jobGroup); if (filter == null || filter.getFilterData().getPmRopEndTime() == null) { - this.subscription = filterAndBuffer(input, this.job) // + this.subscription = filterAndBuffer(input, this.jobGroup) // .flatMap(this::sendToClient) // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // this::handleExceptionInStream, // - () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId())); + () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId())); } if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { this.dataStore.createLock(collectHistoricalDataLockName()) // .doOnNext(isLockGranted -> { if (isLockGranted.booleanValue()) { - logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId()); + logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId()); } else { logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}", - this.job.getId()); + this.jobGroup.getId()); } }) // .filter(isLockGranted -> isLockGranted) // .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) // .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName, - this.job.getId())) // + this.jobGroup.getId())) // .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) // .filter(this::isRopFile) // .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) // .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) // .map(this::createFakeEvent) // - .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(), + .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(), dataStore), 100) - .map(job::filter) // + .map(jobGroup::filter) // .map(this::gzip) // .flatMap(this::sendToClient, 1) // .onErrorResume(this::handleCollectHistoricalDataError) // @@ -122,6 +123,17 @@ public abstract class JobDataDistributor { } } + private static PmReportFilter getPmReportFilter(JobGroup jobGroup) { + + if (jobGroup instanceof JobGroupPm) { + return ((JobGroupPm) jobGroup).getFilter(); + } else if (jobGroup instanceof JobGroupSingle) { + Filter f = ((JobGroupSingle) jobGroup).getJob().getFilter(); + return (f instanceof PmReportFilter) ? (PmReportFilter) f : null; + } + return null; + } + private Filter.FilteredData gzip(Filter.FilteredData data) { if (this.applConfig.isZipOutput()) { try { @@ -131,7 +143,7 @@ public abstract class JobDataDistributor { gzip.flush(); gzip.close(); byte[] zipped = out.toByteArray(); - return new Filter.FilteredData(data.key, zipped, true); + return new Filter.FilteredData(data.infoTypeId, data.key, zipped, true); } catch (IOException e) { logger.error("Unexpected exception when zipping: {}", e.getMessage()); return data; @@ -142,30 +154,28 @@ public abstract class JobDataDistributor { } private Mono handleCollectHistoricalDataError(Throwable t) { - logger.error("Exception: {} job: {}", t.getMessage(), job.getId()); + logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId()); return tryDeleteLockFile() // .map(bool -> "OK"); } private String collectHistoricalDataLockName() { - return "collectHistoricalDataLock" + this.job.getId(); + return "collectHistoricalDataLock" + this.jobGroup.getId(); } private TopicListener.DataFromTopic createFakeEvent(String fileName) { NewFileEvent ev = new NewFileEvent(fileName); - return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false); + return new TopicListener.DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes()); } private static String fileTimePartFromRopFileName(String fileName) { + // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json" return fileName.substring(fileName.lastIndexOf("/") + 2); } private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) { - // A20000626.2315+0200-2330+0200_HTTPS-6-73.json try { - String fileTimePart = fileTimePartFromRopFileName(fileName); - fileTimePart = fileTimePart.substring(0, 18); - OffsetDateTime fileStartTime = parseFileDate(fileTimePart); + OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName); OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime()); boolean isMatch = fileStartTime.isAfter(startTime); logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName, @@ -182,14 +192,11 @@ public abstract class JobDataDistributor { } private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) { - // A20000626.2315+0200-2330+0200_HTTPS-6-73.json if (filter.getPmRopEndTime() == null) { return true; } try { - String fileTimePart = fileTimePartFromRopFileName(fileName); - fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28); - OffsetDateTime fileEndTime = parseFileDate(fileTimePart); + OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName); OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime()); boolean isMatch = fileEndTime.isBefore(endTime); logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime, @@ -202,6 +209,20 @@ public abstract class JobDataDistributor { } } + private static OffsetDateTime getStartTimeFromFileName(String fileName) { + String fileTimePart = fileTimePartFromRopFileName(fileName); + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json + fileTimePart = fileTimePart.substring(0, 18); + return parseFileDate(fileTimePart); + } + + private static OffsetDateTime getEndTimeFromFileName(String fileName) { + String fileTimePart = fileTimePartFromRopFileName(fileName); + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json + fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28); + return parseFileDate(fileTimePart); + } + private static OffsetDateTime parseFileDate(String timeStr) { DateTimeFormatter startTimeFormatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); @@ -209,14 +230,14 @@ public abstract class JobDataDistributor { } private void handleExceptionInStream(Throwable t) { - logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId()); + logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId()); } protected abstract Mono sendToClient(Filter.FilteredData output); public synchronized void stop() { if (this.subscription != null) { - logger.debug("Stopped, job: {}", job.getId()); + logger.debug("Stopped, job: {}", jobGroup.getId()); this.subscription.dispose(); this.subscription = null; } @@ -233,23 +254,26 @@ public abstract class JobDataDistributor { return this.subscription != null; } - private Flux filterAndBuffer(Flux inputFlux, Job job) { + private Flux filterAndBuffer(Flux inputFlux, JobGroup jobGroup) { Flux filtered = // inputFlux // - .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) // - .doOnNext(data -> job.getStatistics().received(data.value)) // - .map(job::filter) // + .doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) // + .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) // + .map(jobGroup::filter) // .map(this::gzip) // .filter(f -> !f.isEmpty()) // - .doOnNext(f -> job.getStatistics().filtered(f.value)) // - .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); // + .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) // + .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) // + ; // + Job job = jobGroup.getJobs().iterator().next(); if (job.isBuffered()) { filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes())); + .map(buffered -> new Filter.FilteredData(this.getJobGroup().getType().getId(), null, + buffered.toString().getBytes())); } return filtered; } @@ -264,7 +288,7 @@ public abstract class JobDataDistributor { } private Mono handleError(Throwable t) { - logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); + logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId()); this.errorStats.handleException(t); return Mono.empty(); // Ignore } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 6b33c3b..c0b1d16 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; -import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +48,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor { private KafkaSender sender; - public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { - super(job, appConfig); + public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) { + super(jobGroup, appConfig); SenderOptions senderOptions = senderOptions(appConfig); this.sender = KafkaSender.create(senderOptions); @@ -57,16 +57,16 @@ public class KafkaJobDataDistributor extends JobDataDistributor { @Override protected Mono sendToClient(Filter.FilteredData data) { - Job job = this.getJob(); - SenderRecord senderRecord = senderRecord(data, job); + + SenderRecord senderRecord = senderRecord(data, this.getJobGroup().getTopic()); logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10), - job.getParameters().getKafkaOutputTopic()); + this.getJobGroup().getTopic()); return this.sender.send(Mono.just(senderRecord)) // - .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) // - .doOnError( - t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) // + .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getTopic())) // + .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(), + t.getMessage())) // .onErrorResume(t -> Mono.empty()) // .collectList() // .map(x -> "ok"); @@ -93,9 +93,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor { return SenderOptions.create(props); } - private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { + private SenderRecord senderRecord(Filter.FilteredData output, String topic) { int correlationMetadata = 2; - String topic = infoJob.getParameters().getKafkaOutputTopic(); var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers()); return SenderRecord.create(producerRecord, correlationMetadata); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 1a20c68..021678d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -85,7 +85,7 @@ public class KafkaTopicListener implements TopicListener { .doFinally( sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) // .filter(t -> t.value().length > 0 || t.key().length > 0) // - .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) // + .map(input -> new DataFromTopic(this.type.getId(), input.headers(), input.key(), input.value())) // .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) // .publish() // .autoConnect(1); @@ -126,7 +126,7 @@ public class KafkaTopicListener implements TopicListener { logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic()); return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) // .map(bytes -> unzip(bytes, ev.getFilename())) // - .map(bytes -> new DataFromTopic(data.key, bytes, false)); + .map(bytes -> new DataFromTopic(data.infoTypeId, data.headers, data.key, bytes)); } catch (Exception e) { return Mono.just(data); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 8b1afc8..debc783 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -34,7 +34,10 @@ public interface TopicListener { public static class DataFromTopic { public final byte[] key; public final byte[] value; - public final boolean isZipped; + + public final String infoTypeId; + + public final Iterable
headers; private static byte[] noBytes = new byte[0]; @@ -43,29 +46,43 @@ public interface TopicListener { @ToString.Exclude private PmReport cachedPmReport; - public DataFromTopic(byte[] key, byte[] value, boolean isZipped) { + public DataFromTopic(String typeId, Iterable
headers, byte[] key, byte[] value) { this.key = key == null ? noBytes : key; this.value = value == null ? noBytes : value; - this.isZipped = isZipped; + this.infoTypeId = typeId; + this.headers = headers; } public String valueAsString() { return new String(this.value); } - public static final String ZIP_PROPERTY = "gzip"; + public static final String ZIPPED_PROPERTY = "gzip"; + public static final String TYPE_ID_PROPERTY = "type-id"; - public static boolean findZipped(Iterable
headers) { + public boolean isZipped() { if (headers == null) { return false; } for (Header h : headers) { - if (h.key().equals(ZIP_PROPERTY)) { + if (h.key().equals(ZIPPED_PROPERTY)) { return true; } } return false; } + + public String getTypeIdFromHeaders() { + if (headers == null) { + return ""; + } + for (Header h : headers) { + if (h.key().equals(TYPE_ID_PROPERTY)) { + return new String(h.value()); + } + } + return ""; + } } public Flux getFlux(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index a3e3703..56180d7 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -30,8 +30,8 @@ import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; -import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; import org.oran.dmaapadapter.repository.MultiMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,52 +70,52 @@ public class TopicListeners { jobs.addObserver(new Jobs.Observer() { @Override - public void onJobbAdded(Job job) { - addJob(job); + public void onJobbGroupAdded(JobGroup jobGroup) { + addJob(jobGroup); } @Override - public void onJobRemoved(Job job) { - removeJob(job); + public void onJobGroupRemoved(JobGroup jobGroup) { + removeDistributor(jobGroup); } }); } - public synchronized void addJob(Job job) { - removeJob(job); - logger.debug("Job added {}", job.getId()); - if (job.getType().isKafkaTopicDefined()) { - addConsumer(job, dataDistributors, kafkaTopicListeners); + public synchronized void addJob(JobGroup jobGroup) { + removeDistributor(jobGroup); + logger.debug("Job added {}", jobGroup.getId()); + if (jobGroup.getType().isKafkaTopicDefined()) { + addDistributor(jobGroup, dataDistributors, kafkaTopicListeners); } - if (job.getType().isDmaapTopicDefined()) { - addConsumer(job, dataDistributors, dmaapTopicListeners); + if (jobGroup.getType().isDmaapTopicDefined()) { + addDistributor(jobGroup, dataDistributors, dmaapTopicListeners); } } - private JobDataDistributor createConsumer(Job job) { - return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) - : new HttpJobDataDistributor(job, appConfig); + private JobDataDistributor createDistributor(JobGroup jobGroup) { + return !Strings.isEmpty(jobGroup.getTopic()) ? new KafkaJobDataDistributor(jobGroup, appConfig) + : new HttpJobDataDistributor(jobGroup, appConfig); } - private void addConsumer(Job job, MultiMap distributors, + private void addDistributor(JobGroup jobGroup, MultiMap distributors, Map topicListeners) { - TopicListener topicListener = topicListeners.get(job.getType().getId()); - JobDataDistributor distributor = createConsumer(job); + TopicListener topicListener = topicListeners.get(jobGroup.getType().getId()); + JobDataDistributor distributor = createDistributor(jobGroup); distributor.start(topicListener.getFlux()); - distributors.put(job.getType().getId(), job.getId(), distributor); + distributors.put(jobGroup.getType().getId(), jobGroup.getId(), distributor); } - public synchronized void removeJob(Job job) { - removeJob(job, dataDistributors); + public synchronized void removeDistributor(JobGroup jobGroup) { + removeDistributor(jobGroup, dataDistributors); } - private static void removeJob(Job job, MultiMap distributors) { - JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId()); + private static void removeDistributor(JobGroup jobGroup, MultiMap distributors) { + JobDataDistributor distributor = distributors.remove(jobGroup.getType().getId(), jobGroup.getId()); if (distributor != null) { - logger.debug("Job removed {}", job.getId()); + logger.debug("Job removed {}", jobGroup.getId()); distributor.stop(); } } diff --git a/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/src/test/java/org/oran/dmaapadapter/ConsumerController.java index a62ae7d..f1b0f21 100644 --- a/src/test/java/org/oran/dmaapadapter/ConsumerController.java +++ b/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -54,9 +54,9 @@ public class ConsumerController { public static class TestResults { - public List receivedBodies = Collections.synchronizedList(new ArrayList()); + public final List receivedBodies = Collections.synchronizedList(new ArrayList()); - public List> receivedHeaders = + public final List> receivedHeaders = Collections.synchronizedList(new ArrayList>()); public TestResults() {} diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 7e806e2..7317919 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -203,16 +203,17 @@ class IntegrationWithKafka { } private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) { - if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) { - logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped, + if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) { + logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(), this.applicationConfig.isZipOutput()); } - if (!receivedKafkaOutput.isZipped) { + + if (!receivedKafkaOutput.isZipped()) { return receivedKafkaOutput; } try { byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value); - return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false); + return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key); } catch (IOException e) { logger.error("********* ERROR ", e.getMessage()); return null; @@ -222,7 +223,10 @@ class IntegrationWithKafka { private void set(TopicListener.DataFromTopic receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; this.count++; - logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); + if (logger.isDebugEnabled()) { + logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); + logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders()); + } } synchronized String lastKey() { @@ -234,7 +238,7 @@ class IntegrationWithKafka { } void reset() { - this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false); + this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null); this.count = 0; } } @@ -494,9 +498,11 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient()); this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + waitForKafkaListener(); var dataToSend = Flux.range(1, 3).map(i -> kafkaSenderRecord("Message_" + i, "", KAFKA_TYPE_ID)); // Message_1, @@ -622,7 +628,7 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - final int NO_OF_COUNTERS = 5; + final int NO_OF_COUNTERS = 2; for (int i = 0; i < NO_OF_COUNTERS; ++i) { filterData.getMeasTypes().add("pmCounterNumber" + i); } @@ -678,34 +684,32 @@ class IntegrationWithKafka { @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test - void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception { + void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception { // Filter PM reports and sent to many jobs over Kafka - this.applicationConfig.setZipOutput(true); + this.applicationConfig.setZipOutput(false); // Register producer, Register types await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - final int NO_OF_COUNTERS = 0; - for (int i = 0; i < NO_OF_COUNTERS; ++i) { - filterData.getMeasTypes().add("pmCounterNumber" + i); - } - filterData.getMeasObjClass().add("NRCellCU"); + final int NO_OF_JOBS = 150; + ArrayList receivers = new ArrayList<>(); + for (int i = 0; i < NO_OF_JOBS; ++i) { + final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers"; + final String jobId = "manyJobs_" + i; + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("pmCounterNumber" + i); // all counters will be added + filterData.getMeasObjClass().add("NRCellCU"); - final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers"; - this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient()); - final int NO_OF_RECEIVERS = 150; - ArrayList receivers = new ArrayList<>(); - for (int i = 0; i < NO_OF_RECEIVERS; ++i) { KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i); receivers.add(receiver); } - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); final int NO_OF_OBJECTS = 1000; diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java index 6fa7ce8..7ad5a6c 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java @@ -33,7 +33,7 @@ import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsltFilterTest { private String filterReport(JsltFilter filter) throws Exception { - DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false); + DataFromTopic data = new DataFromTopic("type", null, null, loadReport().getBytes()); FilteredData filtered = filter.filter(data); return filtered.getValueAString(); } diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java index 6e75757..2f5b4b1 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java @@ -36,7 +36,7 @@ class JsonPathFilterTest { void testJsonPath() throws Exception { String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); JsonPathFilter filter = new JsonPathFilter(exp); - DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false); + DataFromTopic data = new DataFromTopic("typeId", null, null, loadReport().getBytes()); FilteredData filtered = filter.filter(data); String res = filtered.getValueAString(); assertThat(res).isEqualTo("\"attTCHSeizures\""); diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index 2f1f3a6..bb6f145 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -99,7 +99,8 @@ class PmReportFilterTest { private String filterReport(PmReportFilter filter) throws Exception { - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); + TopicListener.DataFromTopic data = + new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes()); FilteredData filtered = filter.filter(data); String reportAfterFilter = gson.toJson(data.getCachedPmReport()); @@ -156,7 +157,8 @@ class PmReportFilterTest { } { - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); + TopicListener.DataFromTopic data = + new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes()); PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData(); utranCellFilter.measObjClass.add("UtranCell"); @@ -213,7 +215,7 @@ class PmReportFilterTest { filterData.getMeasTypes().add("pmCounterNumber0"); filterData.getMeasObjClass().add("NRCellCU"); PmReportFilter filter = new PmReportFilter(filterData); - DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false); + DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes()); Instant startTime = Instant.now(); for (int i = 0; i < TIMES; ++i) { @@ -266,10 +268,11 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); PmReportFilter filter = new PmReportFilter(filterData); - FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false)); + FilteredData filtered = filter.filter(new TopicListener.DataFromTopic("typeId", null, null, "junk".getBytes())); assertThat(filtered.isEmpty()).isTrue(); - filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false)); + filtered = filter + .filter(new TopicListener.DataFromTopic("typeId", null, null, reQuote("{'msg': 'test'}").getBytes())); assertThat(filtered.isEmpty()).isTrue(); }