This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to
define which meas-types (counters) to get from which resources.
+"It is possible to both filter on new data that is collected from the traffical nodes and to query from data that is already collected.
+
The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering.
* sourceNames an array of source names for wanted PM reports.
* measuredEntityDns partial match of meas entity DNs.
* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
+* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned.
+ In this case, the query is only done for files from the given "sourceNames".
+ If this parameter is excluded, only "new" reports will be delivered as they are collected from the traffical nodes.
-All PM filter properties are optional and a non given will result in "match all".
-The result of the filtering is still following the structure of a 3GPP PM report.
+All PM filter properties are optional and a non given will result in "match all" (except the pmRopStartTime).
Below follows an example of a PM filter.
],
"measuredEntityDns":[
"ManagedElement=RNC-Gbg-1"
+ ],
+ "measObjClass":[
+ "UtranCell"
+ ]
+ "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00"
+ }
+ }
+
+
+Here is an example of a filter that will
+match two counters from all cells in two traffical nodes.
+
+.. code-block:: javascript
+
+ {
+ "filterType":"pmdata",
+ "filter": {
+ "sourceNames":[
+ "O-DU-1122", "O-DU-1123"
+ ],
+ "measTypes":[
+ "succImmediateAssignProcs", "attTCHSeizures"
+ ],
+ "measObjClass":[
+ "UtranCell"
]
}
}
+
+
+********************
+Bulk PM subscription
+********************
+
+The sequence is that a "new file event" is received (from a Kafka topic).
+The file is read from local storage (file storage or S3 object store). For each Job, the specified PM filter is applied to the data
+and the result is sent to the Kafka topic specified by the Job (by the data consumer).
+
+.. image:: ./dedicatedTopics.png
+ :width: 500pt
+
+The result of the PM filtering is still following the structure of a 3GPP PM report.
+Here follows an example of a resulting delivered PM report.
+
+.. code-block:: javascript
+
+ {
+ "event":{
+ "commonEventHeader":{
+ "domain":"perf3gpp",
+ "eventId":"9efa1210-f285-455f-9c6a-3a659b1f1882",
+ "eventName":"perf3gpp_gnb-Ericsson_pmMeasResult",
+ "sourceName":"O-DU-1122",
+ "reportingEntityName":"",
+ "startEpochMicrosec":951912000000,
+ "lastEpochMicrosec":951912900000,
+ "timeZoneOffset":"+00:00"
+ },
+ "perf3gppFields":{
+ "perf3gppFieldsVersion":"1.0",
+ "measDataCollection":{
+ "granularityPeriod":900,
+ "measuredEntityUserName":"RNC Telecomville",
+ "measuredEntityDn":"SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
+ "measuredEntitySoftwareVersion":"",
+ "measInfoList":[
+ {
+ "measInfoId":{
+ "sMeasInfoId":""
+ },
+ "measTypes":{
+ "map":{
+ "succImmediateAssignProcs":1
+ },
+ "sMeasTypesList":[
+ "succImmediateAssignProcs"
+ ]
+ },
+ "measValuesList":[
+ {
+ "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-997",
+ "suspectFlag":"false",
+ "measResults":[
+ {
+ "p":1,
+ "sValue":"1113"
+ }
+ ]
+ },
+ {
+ "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-998",
+ "suspectFlag":"false",
+ "measResults":[
+ {
+ "p":1,
+ "sValue":"234"
+ }
+ ]
+ },
+ {
+ "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-999",
+ "suspectFlag":"true",
+ "measResults":[
+ {
+ "p":1,
+ "sValue":"789"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+
+If several jobs publish to the same Kafka topic (shared topic), the resulting filtered output will be an aggregate of all matching filters.
+So, each consumer will then get more data than requested.
+
+.. image:: ./sharedTopics.png
+ :width: 500pt
+
+==================
+Sent Kafka headers
+==================
+
+For each filtered result sent to a Kafka topic, there will the following proerties in the Kafa header:
+
+* type-id, this propery is used to indicate the ID of the information type. The value is a string.
+* gzip, if this property exists the object is gzipped (otherwise not). The property has no value.
\ No newline at end of file
logger.warn("jobCreatedCallback failed: {}", e.getMessage());
return ErrorResponse.create(e, e.getHttpStatus());
} catch (Exception e) {
- logger.warn("jobCreatedCallback failed: {}", e.getMessage());
+ logger.warn("jobCreatedCallback failed: {}", e.getMessage(), e);
return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
}
}
public static class FilteredData {
public final byte[] key;
public final byte[] value;
+ public final String infoTypeId;
@Getter
private final boolean isZipped;
- private static final FilteredData emptyData = new FilteredData(null, null);
+ private static final FilteredData emptyData = new FilteredData(null, null, null);
public boolean isEmpty() {
return (key == null || key.length == 0) && (value == null || value.length == 0);
}
- public FilteredData(byte[] key, byte[] value) {
- this(key, value, false);
+ public FilteredData(String type, byte[] key, byte[] value) {
+ this(type, key, value, false);
}
- public FilteredData(byte[] key, byte[] value, boolean isZipped) {
+ public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
this.key = key;
this.value = value;
this.isZipped = isZipped;
+ this.infoTypeId = type;
}
public String getValueAString() {
public Iterable<Header> headers() {
ArrayList<Header> result = new ArrayList<>();
if (isZipped()) {
- Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null);
- result.add(h);
+ result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
}
+ result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
return result;
}
}
import com.google.gson.GsonBuilder;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
}
+ public static PmReportFilter createAggregateFilter(Collection<PmReportFilter> filters) {
+ PmReportFilter.FilterData resultFilterData = filters.iterator().next().getFilterData();
+ for (PmReportFilter filter : filters) {
+ resultFilterData.addAll(filter.getFilterData());
+ }
+ return new PmReportFilter(resultFilterData);
+ }
+
private static PmReportFilter.FilterData createPmFilterData(Object filter) {
String str = gson.toJson(filter);
return gson.fromJson(str, PmReportFilter.FilterData.class);
@Override
public FilteredData filter(DataFromTopic data) {
if (expression == null) {
- return new FilteredData(data.key, data.value);
+ return new FilteredData(data.infoTypeId, data.key, data.value);
}
try {
JsonFactory factory = mapper.getFactory();
if (filteredNode == NullNode.instance) {
return FilteredData.empty();
}
- return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode));
+ return new FilteredData(data.infoTypeId, data.key, mapper.writeValueAsBytes(filteredNode));
} catch (Exception e) {
return FilteredData.empty();
}
String str = new String(data.value);
Object o = JsonPath.parse(str).read(this.expression, Object.class);
String json = gson.toJson(o);
- return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes());
+ return o == null ? FilteredData.empty() : new FilteredData(data.infoTypeId, data.key, json.getBytes());
} catch (Exception e) {
return FilteredData.empty();
}
@Expose
private String eventId;
- @Expose
- private int sequence;
-
@Expose
private String eventName;
@Expose
private String reportingEntityName;
- @Expose
- private String priority;
-
@Expose
private long startEpochMicrosec;
@Expose
private long lastEpochMicrosec;
- @Expose
- private String version;
-
- @Expose
- private String vesEventListenerVersion;
-
@Expose
private String timeZoneOffset;
+
+ /* Not reported elements */
+ int sequence;
+ String priority;
+ String version;
+ String vesEventListenerVersion;
}
public static class MeasInfoId {
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import lombok.Getter;
import lombok.Setter;
@Getter
public static class FilterData {
- final Collection<String> sourceNames = new HashSet<>();
- final Collection<String> measObjInstIds = new ArrayList<>();
- final Collection<String> measTypes = new HashSet<>();
- final Collection<String> measuredEntityDns = new ArrayList<>();
- final Collection<String> measObjClass = new HashSet<>();
+ final Set<String> sourceNames = new HashSet<>();
+ final Set<String> measObjInstIds = new HashSet<>();
+ final Set<String> measTypes = new HashSet<>();
+ final Set<String> measuredEntityDns = new HashSet<>();
+ final Set<String> measObjClass = new HashSet<>();
@Setter
String pmRopStartTime;
@Setter
String pmRopEndTime;
+
+ public void addAll(FilterData other) {
+ addAll(other.sourceNames, sourceNames);
+ addAll(other.measObjInstIds, measObjInstIds);
+ addAll(other.measTypes, measTypes);
+ addAll(other.measuredEntityDns, measuredEntityDns);
+ addAll(other.measObjClass, measObjClass);
+ }
+
+ private void addAll(Set<String> source, Set<String> dst) {
+ if (source.isEmpty()) {
+ dst.clear();
+ } else if (dst.isEmpty()) {
+ // Nothing, this means 'match all'
+ } else {
+ dst.addAll(source);
+ }
+ }
}
private static class MeasTypesIndexed extends PmReport.MeasTypes {
if (reportFiltered == null) {
return FilteredData.empty();
}
- return new FilteredData(data.key, gson.toJson(reportFiltered).getBytes());
+ return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes());
} catch (Exception e) {
logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
return FilteredData.empty();
@Override
public FilteredData filter(DataFromTopic data) {
if (regexp == null) {
- return new FilteredData(data.key, data.value);
+ return new FilteredData(data.infoTypeId, data.key, data.value);
}
Matcher matcher = regexp.matcher(data.valueAsString());
boolean match = matcher.find();
if (match) {
- return new FilteredData(data.key, data.value);
+ return new FilteredData(data.infoTypeId, data.key, data.value);
} else {
return FilteredData.empty();
}
public Filter.FilteredData filter(DataFromTopic data) {
if (filter == null) {
logger.debug("No filter used");
- return new Filter.FilteredData(data.key, data.value);
+ return new Filter.FilteredData(data.infoTypeId, data.key, data.value);
}
return filter.filter(data);
}
import java.util.Map;
import java.util.Vector;
+import lombok.Getter;
+
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job.Parameters;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Component
public class Jobs {
public interface Observer {
- void onJobbAdded(Job job);
+ void onJobbGroupAdded(JobGroup jobGroup);
+
+ void onJobGroupRemoved(JobGroup jobGroup);
+ }
+
+ public interface JobGroup {
+ public String getId();
+
+ public InfoType getType();
+
+ public void remove(Job job);
+
+ public boolean isEmpty();
+
+ public Filter.FilteredData filter(DataFromTopic data);
+
+ public Iterable<Job> getJobs();
+
+ public String getTopic();
+ }
+
+ public static class JobGroupSingle implements JobGroup {
+ @Getter
+ private final Job job;
+ private boolean isJobRemoved = false;
+
+ public JobGroupSingle(Job job) {
+ this.job = job;
+ }
+
+ @Override
+ public Filter.FilteredData filter(DataFromTopic data) {
+ return job.filter(data);
+ }
+
+ @Override
+ public void remove(Job job) {
+ this.isJobRemoved = true;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return isJobRemoved;
+ }
+
+ @Override
+ public String getId() {
+ return job.getId();
+ }
+
+ @Override
+ public InfoType getType() {
+ return job.getType();
+ }
+
+ @Override
+ public Iterable<Job> getJobs() {
+ Collection<Job> c = new ArrayList<>();
+ c.add(job);
+ return c;
+ }
+
+ @Override
+ public String getTopic() {
+ return this.job.getParameters().getKafkaOutputTopic();
+ }
+ }
+
+ public static class JobGroupPm implements JobGroup {
+ @Getter
+ private final String topic;
- void onJobRemoved(Job job);
+ private Map<String, Job> jobs = new HashMap<>();
+
+ @Getter
+ private PmReportFilter filter;
+
+ @Getter
+ private final InfoType type;
+
+ public JobGroupPm(InfoType type, String topic) {
+ this.topic = topic;
+ this.type = type;
+ }
+
+ public synchronized void add(Job job) {
+ this.jobs.put(job.getId(), job);
+ this.filter = createFilter();
+ }
+
+ public synchronized void remove(Job job) {
+ this.jobs.remove(job.getId());
+ if (!this.jobs.isEmpty()) {
+ this.filter = createFilter();
+ }
+ }
+
+ public boolean isEmpty() {
+ return jobs.isEmpty();
+ }
+
+ @Override
+ public Filter.FilteredData filter(DataFromTopic data) {
+ return filter.filter(data);
+ }
+
+ public Job getAJob() {
+ if (this.jobs.isEmpty()) {
+ return null;
+ }
+ return this.jobs.values().iterator().next();
+ }
+
+ private PmReportFilter createFilter() {
+ Collection<PmReportFilter> filterData = new ArrayList<>();
+ this.jobs.forEach((key, value) -> filterData.add((PmReportFilter) value.getFilter()));
+ return FilterFactory.createAggregateFilter(filterData);
+ }
+
+ @Override
+ public String getId() {
+ return topic;
+ }
+
+ @Override
+ public Iterable<Job> getJobs() {
+ return this.jobs.values();
+ }
}
private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
+ private Map<String, JobGroup> jobGroups = new HashMap<>();
private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
private final ApplicationConfig appConfig;
: restclientFactory.createRestClientNoHttpProxy(callbackUrl);
Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
this.put(job);
- synchronized (observers) {
- this.observers.forEach(obs -> obs.onJobbAdded(job));
- }
}
public void addObserver(Observer obs) {
}
}
+ private String jobGroupId(Job job) {
+ if (Strings.isNullOrEmpty(job.getParameters().getKafkaOutputTopic())) {
+ return job.getId();
+ } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) {
+ return job.getParameters().getKafkaOutputTopic();
+ } else {
+ return job.getId();
+ }
+ }
+
private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
+ remove(job.getId());
+
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
+
+ if (job.getParameters().getFilterType() == Filter.Type.PM_DATA
+ && job.getParameters().getKafkaOutputTopic() != null) {
+ String topic = job.getParameters().getKafkaOutputTopic();
+ if (!this.jobGroups.containsKey(topic)) {
+ final JobGroupPm group = new JobGroupPm(job.getType(), topic);
+ this.jobGroups.put(topic, group);
+ group.add(job);
+ this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+ } else {
+ JobGroupPm group = (JobGroupPm) this.jobGroups.get(topic);
+ group.add(job);
+ }
+ } else {
+ JobGroupSingle group = new JobGroupSingle(job);
+ this.jobGroups.put(job.getId(), group);
+ this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+ }
}
public synchronized Iterable<Job> getAll() {
}
public void remove(Job job) {
+ String groupId = this.jobGroupId(job);
+ JobGroup group = this.jobGroups.get(groupId);
synchronized (this) {
this.allJobs.remove(job.getId());
jobsByType.remove(job.getType().getId(), job.getId());
+ group.remove(job);
+ if (group.isEmpty()) {
+ this.jobGroups.remove(groupId);
+ }
}
- notifyJobRemoved(job);
- }
- private synchronized void notifyJobRemoved(Job job) {
- this.observers.forEach(obs -> obs.onJobRemoved(job));
+ if (group.isEmpty()) {
+ this.observers.forEach(obs -> obs.onJobGroupRemoved(group));
+ }
}
public synchronized int size() {
public void clear() {
- this.allJobs.forEach((id, job) -> notifyJobRemoved(job));
+ this.jobGroups.forEach((id, group) -> this.observers.forEach(obs -> obs.onJobGroupRemoved(group)));
synchronized (this) {
allJobs.clear();
.doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
.doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
- .map(input -> new DataFromTopic(null, input.getBytes(), false))
+ .map(input -> new DataFromTopic(this.type.getId(), null, null, input.getBytes()))
.flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
.publish() //
.autoConnect();
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
public class HttpJobDataDistributor extends JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+ public HttpJobDataDistributor(JobGroup job, ApplicationConfig config) {
super(job, config);
}
@Override
protected Mono<String> sendToClient(Filter.FilteredData output) {
- Job job = this.getJob();
- logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
+ JobGroupSingle group = (JobGroupSingle) this.getJobGroup();
+ Job job = group.getJob();
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output.getValueAString());
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
return job.getConsumerRestClient().post("", output.getValueAString(), contentType);
}
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupPm;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
@Getter
- private final Job job;
+ private final JobGroup jobGroup;
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
}
}
- protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+ protected JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) {
this.applConfig = applConfig;
- this.job = job;
+ this.jobGroup = jobGroup;
this.dataStore = DataStore.create(applConfig);
this.dataStore.create(DataStore.Bucket.FILES).subscribe();
this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
}
public void start(Flux<TopicListener.DataFromTopic> input) {
- logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(),
- job.getParameters().getKafkaOutputTopic());
- PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
-
+ logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
+ PmReportFilter filter = getPmReportFilter(this.jobGroup);
if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
- this.subscription = filterAndBuffer(input, this.job) //
+ this.subscription = filterAndBuffer(input, this.jobGroup) //
.flatMap(this::sendToClient) //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
- () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId()));
+ () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));
}
if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
this.dataStore.createLock(collectHistoricalDataLockName()) //
.doOnNext(isLockGranted -> {
if (isLockGranted.booleanValue()) {
- logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId());
+ logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());
} else {
logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
- this.job.getId());
+ this.jobGroup.getId());
}
}) //
.filter(isLockGranted -> isLockGranted) //
.flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
.doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
- this.job.getId())) //
+ this.jobGroup.getId())) //
.flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
.filter(this::isRopFile) //
.filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
.filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
.map(this::createFakeEvent) //
- .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
+ .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),
dataStore), 100)
- .map(job::filter) //
+ .map(jobGroup::filter) //
.map(this::gzip) //
.flatMap(this::sendToClient, 1) //
.onErrorResume(this::handleCollectHistoricalDataError) //
}
}
+ private static PmReportFilter getPmReportFilter(JobGroup jobGroup) {
+
+ if (jobGroup instanceof JobGroupPm) {
+ return ((JobGroupPm) jobGroup).getFilter();
+ } else if (jobGroup instanceof JobGroupSingle) {
+ Filter f = ((JobGroupSingle) jobGroup).getJob().getFilter();
+ return (f instanceof PmReportFilter) ? (PmReportFilter) f : null;
+ }
+ return null;
+ }
+
private Filter.FilteredData gzip(Filter.FilteredData data) {
if (this.applConfig.isZipOutput()) {
try {
gzip.flush();
gzip.close();
byte[] zipped = out.toByteArray();
- return new Filter.FilteredData(data.key, zipped, true);
+ return new Filter.FilteredData(data.infoTypeId, data.key, zipped, true);
} catch (IOException e) {
logger.error("Unexpected exception when zipping: {}", e.getMessage());
return data;
}
private Mono<String> handleCollectHistoricalDataError(Throwable t) {
- logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
+ logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId());
return tryDeleteLockFile() //
.map(bool -> "OK");
}
private String collectHistoricalDataLockName() {
- return "collectHistoricalDataLock" + this.job.getId();
+ return "collectHistoricalDataLock" + this.jobGroup.getId();
}
private TopicListener.DataFromTopic createFakeEvent(String fileName) {
NewFileEvent ev = new NewFileEvent(fileName);
- return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
+ return new TopicListener.DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes());
}
private static String fileTimePartFromRopFileName(String fileName) {
+ // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"
return fileName.substring(fileName.lastIndexOf("/") + 2);
}
private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
- // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
try {
- String fileTimePart = fileTimePartFromRopFileName(fileName);
- fileTimePart = fileTimePart.substring(0, 18);
- OffsetDateTime fileStartTime = parseFileDate(fileTimePart);
+ OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName);
OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
boolean isMatch = fileStartTime.isAfter(startTime);
logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
}
private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
- // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
if (filter.getPmRopEndTime() == null) {
return true;
}
try {
- String fileTimePart = fileTimePartFromRopFileName(fileName);
- fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
- OffsetDateTime fileEndTime = parseFileDate(fileTimePart);
+ OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName);
OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
boolean isMatch = fileEndTime.isBefore(endTime);
logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
}
}
+ private static OffsetDateTime getStartTimeFromFileName(String fileName) {
+ String fileTimePart = fileTimePartFromRopFileName(fileName);
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+ fileTimePart = fileTimePart.substring(0, 18);
+ return parseFileDate(fileTimePart);
+ }
+
+ private static OffsetDateTime getEndTimeFromFileName(String fileName) {
+ String fileTimePart = fileTimePartFromRopFileName(fileName);
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+ fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
+ return parseFileDate(fileTimePart);
+ }
+
private static OffsetDateTime parseFileDate(String timeStr) {
DateTimeFormatter startTimeFormatter =
new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
}
private void handleExceptionInStream(Throwable t) {
- logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
+ logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId());
}
protected abstract Mono<String> sendToClient(Filter.FilteredData output);
public synchronized void stop() {
if (this.subscription != null) {
- logger.debug("Stopped, job: {}", job.getId());
+ logger.debug("Stopped, job: {}", jobGroup.getId());
this.subscription.dispose();
this.subscription = null;
}
return this.subscription != null;
}
- private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+ private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, JobGroup jobGroup) {
Flux<Filter.FilteredData> filtered = //
inputFlux //
- .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) //
- .doOnNext(data -> job.getStatistics().received(data.value)) //
- .map(job::filter) //
+ .doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) //
+ .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) //
+ .map(jobGroup::filter) //
.map(this::gzip) //
.filter(f -> !f.isEmpty()) //
- .doOnNext(f -> job.getStatistics().filtered(f.value)) //
- .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); //
+ .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) //
+ .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) //
+ ; //
+ Job job = jobGroup.getJobs().iterator().next();
if (job.isBuffered()) {
filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes()));
+ .map(buffered -> new Filter.FilteredData(this.getJobGroup().getType().getId(), null,
+ buffered.toString().getBytes()));
}
return filtered;
}
}
private Mono<String> handleError(Throwable t) {
- logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+ logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId());
this.errorStats.handleException(t);
return Mono.empty(); // Ignore
}
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
-import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private KafkaSender<byte[], byte[]> sender;
- public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
- super(job, appConfig);
+ public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) {
+ super(jobGroup, appConfig);
SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
this.sender = KafkaSender.create(senderOptions);
@Override
protected Mono<String> sendToClient(Filter.FilteredData data) {
- Job job = this.getJob();
- SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, job);
+
+ SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getTopic());
logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
- job.getParameters().getKafkaOutputTopic());
+ this.getJobGroup().getTopic());
return this.sender.send(Mono.just(senderRecord)) //
- .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) //
- .doOnError(
- t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) //
+ .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getTopic())) //
+ .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
+ t.getMessage())) //
.onErrorResume(t -> Mono.empty()) //
.collectList() //
.map(x -> "ok");
return SenderOptions.create(props);
}
- private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
+ private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, String topic) {
int correlationMetadata = 2;
- String topic = infoJob.getParameters().getKafkaOutputTopic();
var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
return SenderRecord.create(producerRecord, correlationMetadata);
}
.doFinally(
sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
.filter(t -> t.value().length > 0 || t.key().length > 0) //
- .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
+ .map(input -> new DataFromTopic(this.type.getId(), input.headers(), input.key(), input.value())) //
.flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) //
.publish() //
.autoConnect(1);
logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic());
return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
.map(bytes -> unzip(bytes, ev.getFilename())) //
- .map(bytes -> new DataFromTopic(data.key, bytes, false));
+ .map(bytes -> new DataFromTopic(data.infoTypeId, data.headers, data.key, bytes));
} catch (Exception e) {
return Mono.just(data);
public static class DataFromTopic {
public final byte[] key;
public final byte[] value;
- public final boolean isZipped;
+
+ public final String infoTypeId;
+
+ public final Iterable<Header> headers;
private static byte[] noBytes = new byte[0];
@ToString.Exclude
private PmReport cachedPmReport;
- public DataFromTopic(byte[] key, byte[] value, boolean isZipped) {
+ public DataFromTopic(String typeId, Iterable<Header> headers, byte[] key, byte[] value) {
this.key = key == null ? noBytes : key;
this.value = value == null ? noBytes : value;
- this.isZipped = isZipped;
+ this.infoTypeId = typeId;
+ this.headers = headers;
}
public String valueAsString() {
return new String(this.value);
}
- public static final String ZIP_PROPERTY = "gzip";
+ public static final String ZIPPED_PROPERTY = "gzip";
+ public static final String TYPE_ID_PROPERTY = "type-id";
- public static boolean findZipped(Iterable<Header> headers) {
+ public boolean isZipped() {
if (headers == null) {
return false;
}
for (Header h : headers) {
- if (h.key().equals(ZIP_PROPERTY)) {
+ if (h.key().equals(ZIPPED_PROPERTY)) {
return true;
}
}
return false;
}
+
+ public String getTypeIdFromHeaders() {
+ if (headers == null) {
+ return "";
+ }
+ for (Header h : headers) {
+ if (h.key().equals(TYPE_ID_PROPERTY)) {
+ return new String(h.value());
+ }
+ }
+ return "";
+ }
}
public Flux<DataFromTopic> getFlux();
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
import org.oran.dmaapadapter.repository.MultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
jobs.addObserver(new Jobs.Observer() {
@Override
- public void onJobbAdded(Job job) {
- addJob(job);
+ public void onJobbGroupAdded(JobGroup jobGroup) {
+ addJob(jobGroup);
}
@Override
- public void onJobRemoved(Job job) {
- removeJob(job);
+ public void onJobGroupRemoved(JobGroup jobGroup) {
+ removeDistributor(jobGroup);
}
});
}
- public synchronized void addJob(Job job) {
- removeJob(job);
- logger.debug("Job added {}", job.getId());
- if (job.getType().isKafkaTopicDefined()) {
- addConsumer(job, dataDistributors, kafkaTopicListeners);
+ public synchronized void addJob(JobGroup jobGroup) {
+ removeDistributor(jobGroup);
+ logger.debug("Job added {}", jobGroup.getId());
+ if (jobGroup.getType().isKafkaTopicDefined()) {
+ addDistributor(jobGroup, dataDistributors, kafkaTopicListeners);
}
- if (job.getType().isDmaapTopicDefined()) {
- addConsumer(job, dataDistributors, dmaapTopicListeners);
+ if (jobGroup.getType().isDmaapTopicDefined()) {
+ addDistributor(jobGroup, dataDistributors, dmaapTopicListeners);
}
}
- private JobDataDistributor createConsumer(Job job) {
- return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
- : new HttpJobDataDistributor(job, appConfig);
+ private JobDataDistributor createDistributor(JobGroup jobGroup) {
+ return !Strings.isEmpty(jobGroup.getTopic()) ? new KafkaJobDataDistributor(jobGroup, appConfig)
+ : new HttpJobDataDistributor(jobGroup, appConfig);
}
- private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+ private void addDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors,
Map<String, TopicListener> topicListeners) {
- TopicListener topicListener = topicListeners.get(job.getType().getId());
- JobDataDistributor distributor = createConsumer(job);
+ TopicListener topicListener = topicListeners.get(jobGroup.getType().getId());
+ JobDataDistributor distributor = createDistributor(jobGroup);
distributor.start(topicListener.getFlux());
- distributors.put(job.getType().getId(), job.getId(), distributor);
+ distributors.put(jobGroup.getType().getId(), jobGroup.getId(), distributor);
}
- public synchronized void removeJob(Job job) {
- removeJob(job, dataDistributors);
+ public synchronized void removeDistributor(JobGroup jobGroup) {
+ removeDistributor(jobGroup, dataDistributors);
}
- private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
- JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+ private static void removeDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors) {
+ JobDataDistributor distributor = distributors.remove(jobGroup.getType().getId(), jobGroup.getId());
if (distributor != null) {
- logger.debug("Job removed {}", job.getId());
+ logger.debug("Job removed {}", jobGroup.getId());
distributor.stop();
}
}
public static class TestResults {
- public List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
+ public final List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
- public List<Map<String, String>> receivedHeaders =
+ public final List<Map<String, String>> receivedHeaders =
Collections.synchronizedList(new ArrayList<Map<String, String>>());
public TestResults() {}
}
private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
- if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) {
- logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped,
+ if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
+ logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
this.applicationConfig.isZipOutput());
}
- if (!receivedKafkaOutput.isZipped) {
+
+ if (!receivedKafkaOutput.isZipped()) {
return receivedKafkaOutput;
}
try {
byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value);
- return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false);
+ return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
} catch (IOException e) {
logger.error("********* ERROR ", e.getMessage());
return null;
private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
- logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+ if (logger.isDebugEnabled()) {
+ logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+ logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+ }
}
synchronized String lastKey() {
}
void reset() {
- this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false);
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
this.count = 0;
}
}
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
+
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
waitForKafkaListener();
var dataToSend = Flux.range(1, 3).map(i -> kafkaSenderRecord("Message_" + i, "", KAFKA_TYPE_ID)); // Message_1,
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- final int NO_OF_COUNTERS = 5;
+ final int NO_OF_COUNTERS = 2;
for (int i = 0; i < NO_OF_COUNTERS; ++i) {
filterData.getMeasTypes().add("pmCounterNumber" + i);
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
- void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception {
+ void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
// Filter PM reports and sent to many jobs over Kafka
- this.applicationConfig.setZipOutput(true);
+ this.applicationConfig.setZipOutput(false);
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- final int NO_OF_COUNTERS = 0;
- for (int i = 0; i < NO_OF_COUNTERS; ++i) {
- filterData.getMeasTypes().add("pmCounterNumber" + i);
- }
- filterData.getMeasObjClass().add("NRCellCU");
+ final int NO_OF_JOBS = 150;
+ ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+ for (int i = 0; i < NO_OF_JOBS; ++i) {
+ final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
+ final String jobId = "manyJobs_" + i;
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("pmCounterNumber" + i); // all counters will be added
+ filterData.getMeasObjClass().add("NRCellCU");
- final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
- final int NO_OF_RECEIVERS = 150;
- ArrayList<KafkaReceiver> receivers = new ArrayList<>();
- for (int i = 0; i < NO_OF_RECEIVERS; ++i) {
KafkaReceiver receiver =
new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
receivers.add(receiver);
}
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
final int NO_OF_OBJECTS = 1000;
class JsltFilterTest {
private String filterReport(JsltFilter filter) throws Exception {
- DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+ DataFromTopic data = new DataFromTopic("type", null, null, loadReport().getBytes());
FilteredData filtered = filter.filter(data);
return filtered.getValueAString();
}
void testJsonPath() throws Exception {
String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
JsonPathFilter filter = new JsonPathFilter(exp);
- DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+ DataFromTopic data = new DataFromTopic("typeId", null, null, loadReport().getBytes());
FilteredData filtered = filter.filter(data);
String res = filtered.getValueAString();
assertThat(res).isEqualTo("\"attTCHSeizures\"");
private String filterReport(PmReportFilter filter) throws Exception {
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+ TopicListener.DataFromTopic data =
+ new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
FilteredData filtered = filter.filter(data);
String reportAfterFilter = gson.toJson(data.getCachedPmReport());
}
{
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+ TopicListener.DataFromTopic data =
+ new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
utranCellFilter.measObjClass.add("UtranCell");
filterData.getMeasTypes().add("pmCounterNumber0");
filterData.getMeasObjClass().add("NRCellCU");
PmReportFilter filter = new PmReportFilter(filterData);
- DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false);
+ DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes());
Instant startTime = Instant.now();
for (int i = 0; i < TIMES; ++i) {
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
PmReportFilter filter = new PmReportFilter(filterData);
- FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false));
+ FilteredData filtered = filter.filter(new TopicListener.DataFromTopic("typeId", null, null, "junk".getBytes()));
assertThat(filtered.isEmpty()).isTrue();
- filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false));
+ filtered = filter
+ .filter(new TopicListener.DataFromTopic("typeId", null, null, reQuote("{'msg': 'test'}").getBytes()));
assertThat(filtered.isEmpty()).isTrue();
}