Minor changes.
Step parent pom version
DMaaP adapter, updated job definition syntax
DMaaP adapter, support for shared Kafka topics
Issue-ID: NONRTRIC-838
Signed-off-by: ychacon <yennifer.chacon@est.tech>
Change-Id: Ia6a909a07304c79a0c2d53cdea0f857ebae7f96a
"maxSize": {
"type": "integer"
},
- "maxTimeMiliseconds": {
+ "maxTimeMilliseconds": {
"type": "integer"
}
},
"additionalProperties": false,
"required": [
"maxSize",
- "maxTimeMiliseconds"
+ "maxTimeMilliseconds"
]
}
},
Object1 and Object2 may be posted in one call --> ["Object1", "Object2"]
The bufferTimeout is a Json object and the parameters in the object are:
- maxSize the maximum number of buffered objects before posting
- - maxTimeMiliseconds the maximum delay time to buffer before posting
+ - maxTimeMilliseconds the maximum delay time to buffer before posting
If no bufferTimeout is specified, each object will be posted as received in separate calls (not quoted and put in a Json array).
}},
"tags": ["Actuator"]
}},
+ "/actuator/shutdown": {"post": {
+ "summary": "Actuator web endpoint 'shutdown'",
+ "operationId": "shutdown_2",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
"/data-producer/v1/info-producers/{infoProducerId}": {
"get": {
"operationId": "getInfoProducer",
'*/*':
schema:
type: object
+ /actuator/shutdown:
+ post:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'shutdown'
+ operationId: shutdown_2
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
/data-producer/v1/info-producers/{infoProducerId}:
get:
tags:
web:
exposure:
# Enabling of springboot actuator features. See springboot documentation.
- include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
+ include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown"
+ endpoint:
+ shutdown:
+ enabled: true
+lifecycle:
+ timeout-per-shutdown-phase: "20s"
springdoc:
show-actuator: true
logging:
server:
# Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
# See springboot documentation.
- port : 8435
- http-port: 8084
- ssl:
- key-store-type: JKS
- key-store-password: policy_agent
- key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
- key-password: policy_agent
- key-alias: policy_agent
+ port : 8435
+ http-port: 8084
+ ssl:
+ key-store-type: JKS
+ key-store-password: policy_agent
+ key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
+ key-password: policy_agent
+ key-alias: policy_agent
+ shutdown: "graceful"
app:
webclient:
# Configuration of the trust store used for the HTTP client (outgoing requests)
===============
This schema will by default be registerred for the type. The following properties are defined:
-* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka.
+* outputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. This consists of the following properties:
+
+ * topic, the name of the kafka topic
+ * bootStrapServers, reference to the kafka bus to used. This is optional, if this is omitted the default configured kafka bus is used (which is configured in the application.yaml file).
* filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
* bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call. This contains:
* maxSize, the maximum number of objects to collect before delivery to the consumer
- * maxTimeMiliseconds, the maximum time to delay delivery (to buffer).
+ * maxTimeMilliseconds, the maximum time to delay delivery (to buffer).
* maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher value may increase throughput.
{
"bufferTimeout":{
"maxSize":123,
- "maxTimeMiliseconds":456
+ "maxTimeMilliseconds":456
},
"maxConcurrency":1
}
This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to
define which meas-types (counters) to get from which resources.
+"It is possible to both filter on new data that is collected from the traffical nodes and to query from data that is already collected.
+
The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering.
* sourceNames an array of source names for wanted PM reports.
* measObjInstIds an array of meas object instances for wanted PM reports. If a given filter value is contained in the filter definition, it will match (partial matching).
For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32".
-* measTypes selects the meas types to get
+* measTypeSpecs selects the meas types to get. This consist of:
+
+ * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
+ * measTypes the name of the measurement types (connected to the measObjClass).
* measuredEntityDns partial match of meas entity DNs.
-* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
+* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned.
+ In this case, the query is only done for files from the given "sourceNames".
+ If this parameter is excluded, only "new" reports will be delivered as they are collected from the traffical nodes.
-All PM filter properties are optional and a non given will result in "match all".
-The result of the filtering is still following the structure of a 3GPP PM report.
+All PM filter properties are optional and a non given will result in "match all" (except the pmRopStartTime).
Below follows an example of a PM filter.
"measObjInstIds":[
"UtranCell=dGbg-997"
],
- "measTypes":[
- "succImmediateAssignProcs"
- ],
+ "measTypeSpecs":[
+ {
+ "measuredObjClass":"UtranCell",
+ "measTypes":[
+ "succImmediateAssignProcs"
+ ]
+ }
+ ],
"measuredEntityDns":[
"ManagedElement=RNC-Gbg-1"
- ]
+ ],
+ "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00"
+ }
+ }
+
+
+Here is an example of a filter that will
+match two counters from all cells in two traffical nodes.
+
+.. code-block:: javascript
+
+ {
+ "filterType":"pmdata",
+ "filter": {
+ "sourceNames":[
+ "O-DU-1122", "O-DU-1123"
+ ],
+ "measTypeSpecs":[
+ {
+ "measuredObjClass":"NRCellCU",
+ "measTypes":[
+ "pmCounterNumber0", "pmCounterNumber1"
+ ]
+ }
+ ],
+
}
}
+
+
+********************
+Bulk PM subscription
+********************
+
+The sequence is that a "new file event" is received (from a Kafka topic).
+The file is read from local storage (file storage or S3 object store). For each Job, the specified PM filter is applied to the data
+and the result is sent to the Kafka topic specified by the Job (by the data consumer).
+
+.. image:: ./dedicatedTopics.png
+ :width: 500pt
+
+The result of the PM filtering is still following the structure of a 3GPP PM report.
+Here follows an example of a resulting delivered PM report.
+
+.. code-block:: javascript
+
+ {
+ "event":{
+ "commonEventHeader":{
+ "domain":"perf3gpp",
+ "eventId":"9efa1210-f285-455f-9c6a-3a659b1f1882",
+ "eventName":"perf3gpp_gnb-Ericsson_pmMeasResult",
+ "sourceName":"O-DU-1122",
+ "reportingEntityName":"",
+ "startEpochMicrosec":951912000000,
+ "lastEpochMicrosec":951912900000,
+ "timeZoneOffset":"+00:00"
+ },
+ "perf3gppFields":{
+ "perf3gppFieldsVersion":"1.0",
+ "measDataCollection":{
+ "granularityPeriod":900,
+ "measuredEntityUserName":"RNC Telecomville",
+ "measuredEntityDn":"SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
+ "measuredEntitySoftwareVersion":"",
+ "measInfoList":[
+ {
+ "measInfoId":{
+ "sMeasInfoId":""
+ },
+ "measTypes":{
+ "map":{
+ "succImmediateAssignProcs":1
+ },
+ "sMeasTypesList":[
+ "succImmediateAssignProcs"
+ ]
+ },
+ "measValuesList":[
+ {
+ "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-997",
+ "suspectFlag":"false",
+ "measResults":[
+ {
+ "p":1,
+ "sValue":"1113"
+ }
+ ]
+ },
+ {
+ "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-998",
+ "suspectFlag":"false",
+ "measResults":[
+ {
+ "p":1,
+ "sValue":"234"
+ }
+ ]
+ },
+ {
+ "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-999",
+ "suspectFlag":"true",
+ "measResults":[
+ {
+ "p":1,
+ "sValue":"789"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+
+If several jobs publish to the same Kafka topic (shared topic), the resulting filtered output will be an aggregate of all matching filters.
+So, each consumer will then get more data than requested.
+
+.. image:: ./sharedTopics.png
+ :width: 500pt
+
+==================
+Sent Kafka headers
+==================
+
+For each filtered result sent to a Kafka topic, there will the following proerties in the Kafa header:
+
+* type-id, this propery is used to indicate the ID of the information type. The value is a string.
+* gzip, if this property exists the object is gzipped (otherwise not). The property has no value.
\ No newline at end of file
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.14</version>
+ <version>2.7.8</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric.plt</groupId>
<swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version>
<docker-maven-plugin>0.30.0</docker-maven-plugin>
<sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
- <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
+ <jacoco-maven-plugin.version>0.8.8</jacoco-maven-plugin.version>
<exec.skip>true</exec.skip>
<protobuf.version>4.0.0-rc-2</protobuf.version>
<protobuf-java-format.version>1.4</protobuf-java-format.version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.17.292</version>
- </dependency>
+ </dependency>
</dependencies>
<build>
<plugins>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
+</project>
\ No newline at end of file
public static void main(String[] args) {
applicationContext = SpringApplication.run(Application.class);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ logger.warn("Shutting down, received signal SIGTERM");
+ SpringApplication.exit(applicationContext);
+ applicationContext = null;
+ }
+ });
}
@Scheduled(fixedRate = 10 * 1000)
.map(this::toBody);
}
+ public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+ Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(request);
+ }
+
+ public Mono<String> post(String uri, @Nullable String body) {
+ return postForEntity(uri, body) //
+ .map(this::toBody);
+ }
+
private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
if (securityContext.isConfigured()) {
request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
}
public Collection<InfoType> getTypes() {
- com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
try {
String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
logger.warn("jobCreatedCallback failed: {}", e.getMessage());
return ErrorResponse.create(e, e.getHttpStatus());
} catch (Exception e) {
- logger.warn("jobCreatedCallback failed: {}", e.getMessage());
+ logger.warn("jobCreatedCallback failed: {}", e.getMessage(), e);
return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
}
}
public static class FilteredData {
public final byte[] key;
public final byte[] value;
+ public final String infoTypeId;
@Getter
private final boolean isZipped;
- private static final FilteredData emptyData = new FilteredData(null, null);
+ private static final FilteredData emptyData = new FilteredData(null, null, null);
public boolean isEmpty() {
return (key == null || key.length == 0) && (value == null || value.length == 0);
}
- public FilteredData(byte[] key, byte[] value) {
- this(key, value, false);
+ public FilteredData(String type, byte[] key, byte[] value) {
+ this(type, key, value, false);
}
- public FilteredData(byte[] key, byte[] value, boolean isZipped) {
+ public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) {
this.key = key;
this.value = value;
this.isZipped = isZipped;
+ this.infoTypeId = type;
}
public String getValueAString() {
public Iterable<Header> headers() {
ArrayList<Header> result = new ArrayList<>();
if (isZipped()) {
- Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null);
- result.add(h);
+ result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
}
+ result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
return result;
}
}
import com.google.gson.GsonBuilder;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
}
+ public static PmReportFilter createAggregateFilter(Collection<PmReportFilter> filters) {
+ PmReportFilter.FilterData resultFilterData = filters.iterator().next().getFilterData();
+ for (PmReportFilter filter : filters) {
+ resultFilterData.addAll(filter.getFilterData());
+ }
+ return new PmReportFilter(resultFilterData);
+ }
+
private static PmReportFilter.FilterData createPmFilterData(Object filter) {
String str = gson.toJson(filter);
return gson.fromJson(str, PmReportFilter.FilterData.class);
@Override
public FilteredData filter(DataFromTopic data) {
if (expression == null) {
- return new FilteredData(data.key, data.value);
+ return new FilteredData(data.infoTypeId, data.key, data.value);
}
try {
JsonFactory factory = mapper.getFactory();
if (filteredNode == NullNode.instance) {
return FilteredData.empty();
}
- return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode));
+ return new FilteredData(data.infoTypeId, data.key, mapper.writeValueAsBytes(filteredNode));
} catch (Exception e) {
return FilteredData.empty();
}
String str = new String(data.value);
Object o = JsonPath.parse(str).read(this.expression, Object.class);
String json = gson.toJson(o);
- return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes());
+ return o == null ? FilteredData.empty() : new FilteredData(data.infoTypeId, data.key, json.getBytes());
} catch (Exception e) {
return FilteredData.empty();
}
@Expose
private String eventId;
- @Expose
- private int sequence;
-
@Expose
private String eventName;
@Expose
private String reportingEntityName;
- @Expose
- private String priority;
-
@Expose
private long startEpochMicrosec;
@Expose
private long lastEpochMicrosec;
- @Expose
- private String version;
-
- @Expose
- private String vesEventListenerVersion;
-
@Expose
private String timeZoneOffset;
+
+ /* Not reported elements */
+ int sequence;
+ String priority;
+ String version;
+ String vesEventListenerVersion;
}
public static class MeasInfoId {
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import lombok.Getter;
import lombok.Setter;
@Getter
public static class FilterData {
- final Collection<String> sourceNames = new HashSet<>();
- final Collection<String> measObjInstIds = new ArrayList<>();
- final Collection<String> measTypes = new HashSet<>();
- final Collection<String> measuredEntityDns = new ArrayList<>();
- final Collection<String> measObjClass = new HashSet<>();
+
+ public static class MeasTypeSpec {
+ static MeasTypeSpec empty = new MeasTypeSpec();
+
+ static MeasTypeSpec empty() {
+ return empty;
+ }
+
+ @Getter
+ String measuredObjClass;
+
+ @Getter
+ final Set<String> measTypes = new HashSet<>();
+
+ @Override
+ public boolean equals(Object obj) {
+ return measuredObjClass.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return measuredObjClass.hashCode();
+ }
+ }
+
+ final Set<String> sourceNames = new HashSet<>();
+ final Set<String> measObjInstIds = new HashSet<>();
+ final Collection<MeasTypeSpec> measTypeSpecs = new ArrayList<>();
+ final Set<String> measuredEntityDns = new HashSet<>();
+
+ public void addMeasTypes(String measObjClass, String... measTypes) {
+ MeasTypeSpec spec = this.findMeasTypeSpec(measObjClass);
+ if (spec == null) {
+ spec = new MeasTypeSpec();
+ spec.measuredObjClass = measObjClass;
+ this.measTypeSpecs.add(spec);
+ }
+ for (String measType : measTypes) {
+ spec.measTypes.add(measType);
+ }
+ }
+
+ public void addMeasTypes(String measObjClass, Collection<String> measTypes) {
+ for (String measType : measTypes) {
+ addMeasTypes(measObjClass, measType);
+ }
+ }
@Setter
String pmRopStartTime;
@Setter
String pmRopEndTime;
+
+ public void addAll(FilterData other) {
+ addAll(other.sourceNames, sourceNames);
+ addAll(other.measObjInstIds, measObjInstIds);
+ addAll(other.measTypeSpecs);
+ addAll(other.measuredEntityDns, measuredEntityDns);
+ }
+
+ public MeasTypeSpec getMeasTypeSpec(String measuredObjClass) {
+ if (measTypeSpecs.isEmpty()) {
+ return MeasTypeSpec.empty();
+ }
+ return findMeasTypeSpec(measuredObjClass);
+ }
+
+ private MeasTypeSpec findMeasTypeSpec(String measuredObjClass) {
+ for (MeasTypeSpec t : this.measTypeSpecs) {
+ if (t.measuredObjClass.equals(measuredObjClass)) {
+ return t;
+ }
+ }
+ return null;
+ }
+
+ private void addAll(Collection<MeasTypeSpec> measTypes) {
+ for (MeasTypeSpec s : measTypes) {
+ addMeasTypes(s.getMeasuredObjClass(), s.getMeasTypes());
+ }
+ }
+
+ private void addAll(Set<String> source, Set<String> dst) {
+ if (source.isEmpty()) {
+ dst.clear();
+ } else if (dst.isEmpty()) {
+ // Nothing, this means 'match all'
+ } else {
+ dst.addAll(source);
+ }
+ }
}
private static class MeasTypesIndexed extends PmReport.MeasTypes {
if (reportFiltered == null) {
return FilteredData.empty();
}
- return new FilteredData(data.key, gson.toJson(reportFiltered).getBytes());
+ return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes());
} catch (Exception e) {
logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
return FilteredData.empty();
return false;
}
- private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, FilterData filter) {
+ private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes,
+ FilterData.MeasTypeSpec measTypesSpec) {
String measType = measTypes.getMeasType(measResult.getP());
- return filter.measTypes.isEmpty() || filter.measTypes.contains(measType);
+ return measTypesSpec.measTypes.isEmpty() || measTypesSpec.measTypes.contains(measType);
}
private Collection<MeasResult> createMeasResults(Collection<MeasResult> oldMeasResults, MeasTypes measTypes,
- FilterData filter) {
+ FilterData.MeasTypeSpec measTypesSpec) {
Collection<MeasResult> newMeasResults = new ArrayList<>();
for (MeasResult measResult : oldMeasResults) {
- if (isMeasResultMatch(measResult, measTypes, filter)) {
+ if (isMeasResultMatch(measResult, measTypes, measTypesSpec)) {
newMeasResults.add(measResult.toBuilder().build());
}
}
return distinguishedName.substring(lastRdn + 1, lastEqualChar);
}
- private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) {
- if (filter.measObjClass.isEmpty()) {
- return true;
- }
-
+ private FilterData.MeasTypeSpec getMeasTypeSpec(String measObjInstId, FilterData filter) {
String measObjClass = managedObjectClass(measObjInstId);
- return filter.measObjClass.contains(measObjClass);
+ return filter.getMeasTypeSpec(measObjClass);
}
private MeasValuesList createMeasValuesList(MeasValuesList oldMeasValues, MeasTypes measTypes, FilterData filter) {
+ FilterData.MeasTypeSpec measTypesSpec = getMeasTypeSpec(oldMeasValues.getMeasObjInstId(), filter);
+ if (measTypesSpec == null) {
+ return MeasValuesList.empty();
+ }
- if (isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)
- && isMeasInstClassMatch(oldMeasValues.getMeasObjInstId(), filter)) {
-
- Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, filter);
- return oldMeasValues.toBuilder() //
- .measResults(newResults) //
- .build();
- } else {
+ if (!isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)) {
return MeasValuesList.empty();
}
+
+ Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, measTypesSpec);
+ return oldMeasValues.toBuilder() //
+ .measResults(newResults) //
+ .build();
}
private MeasTypes createMeasTypes(Collection<MeasValuesList> newMeasValues, MeasTypes oldMMeasTypes) {
@Override
public FilteredData filter(DataFromTopic data) {
if (regexp == null) {
- return new FilteredData(data.key, data.value);
+ return new FilteredData(data.infoTypeId, data.key, data.value);
}
Matcher matcher = regexp.matcher(data.valueAsString());
boolean match = matcher.find();
if (match) {
- return new FilteredData(data.key, data.value);
+ return new FilteredData(data.infoTypeId, data.key, data.value);
} else {
return FilteredData.empty();
}
import java.time.Duration;
import lombok.Builder;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Schema(name = "job_statistics", description = "Statistics information for one job")
public static class Statistics {
- // @Schema(name = "jobId", description = "jobId", required = true)
- // @SerializedName("jobId")
@JsonProperty(value = "jobId", required = true)
String jobId;
@Getter
private BufferTimeout bufferTimeout;
+ @Builder
+ @EqualsAndHashCode
+ public static class KafkaDeliveryInfo {
+ @Getter
+ private String topic;
+
+ @Getter
+ private String bootStrapServers;
+ }
+
@Getter
- private String kafkaOutputTopic;
+ private KafkaDeliveryInfo deliveryInfo;
public Filter.Type getFilterType() {
if (filter == null || filterType == null) {
}
public static class BufferTimeout {
- public BufferTimeout(int maxSize, long maxTimeMiliseconds) {
+ public BufferTimeout(int maxSize, long maxTimeMilliseconds) {
this.maxSize = maxSize;
- this.maxTimeMiliseconds = maxTimeMiliseconds;
+ this.maxTimeMilliseconds = maxTimeMilliseconds;
}
public BufferTimeout() {}
@Getter
private int maxSize;
- private long maxTimeMiliseconds;
+ private long maxTimeMilliseconds;
public Duration getMaxTime() {
- return Duration.ofMillis(maxTimeMiliseconds);
+ return Duration.ofMillis(maxTimeMilliseconds);
}
}
.groupId(type.getKafkaGroupId()) //
.inputTopic(type.getKafkaInputTopic()) //
.jobId(id) //
- .outputTopic(parameters.getKafkaOutputTopic()) //
+ .outputTopic(parameters.getDeliveryInfo() == null ? "" : parameters.getDeliveryInfo().topic) //
.typeId(type.getId()) //
.clientId(type.getKafkaClientId(appConfig)) //
.build();
public Filter.FilteredData filter(DataFromTopic data) {
if (filter == null) {
logger.debug("No filter used");
- return new Filter.FilteredData(data.key, data.value);
+ return new Filter.FilteredData(data.infoTypeId, data.key, data.value);
}
return filter.filter(data);
}
public boolean isBuffered() {
return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0
- && parameters.bufferTimeout.maxTimeMiliseconds > 0;
+ && parameters.bufferTimeout.maxTimeMilliseconds > 0;
}
}
import java.util.Map;
import java.util.Vector;
+import lombok.Getter;
+
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job.Parameters;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Component
public class Jobs {
public interface Observer {
- void onJobbAdded(Job job);
+ void onJobbGroupAdded(JobGroup jobGroup);
+
+ void onJobGroupRemoved(JobGroup jobGroup);
+ }
+
+ public interface JobGroup {
+ public String getId();
+
+ public InfoType getType();
+
+ public void remove(Job job);
+
+ public boolean isEmpty();
+
+ public Filter.FilteredData filter(DataFromTopic data);
+
+ public Iterable<Job> getJobs();
+
+ public KafkaDeliveryInfo getDeliveryInfo();
+ }
+
+ public static class JobGroupSingle implements JobGroup {
+ @Getter
+ private final Job job;
+ private boolean isJobRemoved = false;
+
+ public JobGroupSingle(Job job) {
+ this.job = job;
+ }
+
+ @Override
+ public Filter.FilteredData filter(DataFromTopic data) {
+ return job.filter(data);
+ }
+
+ @Override
+ public void remove(Job job) {
+ this.isJobRemoved = true;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return isJobRemoved;
+ }
+
+ @Override
+ public String getId() {
+ return job.getId();
+ }
+
+ @Override
+ public InfoType getType() {
+ return job.getType();
+ }
+
+ @Override
+ public Iterable<Job> getJobs() {
+ Collection<Job> c = new ArrayList<>();
+ c.add(job);
+ return c;
+ }
+
+ @Override
+ public KafkaDeliveryInfo getDeliveryInfo() {
+ return this.job.getParameters().getDeliveryInfo();
+ }
+ }
+
+ public static class JobGroupPm implements JobGroup {
+ @Getter
+ private final KafkaDeliveryInfo deliveryInfo;
- void onJobRemoved(Job job);
+ private Map<String, Job> jobs = new HashMap<>();
+
+ @Getter
+ private PmReportFilter filter;
+
+ @Getter
+ private final InfoType type;
+
+ public JobGroupPm(InfoType type, KafkaDeliveryInfo topic) {
+ this.deliveryInfo = topic;
+ this.type = type;
+ }
+
+ public synchronized void add(Job job) {
+ this.jobs.put(job.getId(), job);
+ this.filter = createFilter();
+ }
+
+ public synchronized void remove(Job job) {
+ this.jobs.remove(job.getId());
+ if (!this.jobs.isEmpty()) {
+ this.filter = createFilter();
+ }
+ }
+
+ public boolean isEmpty() {
+ return jobs.isEmpty();
+ }
+
+ @Override
+ public Filter.FilteredData filter(DataFromTopic data) {
+ return filter.filter(data);
+ }
+
+ public Job getAJob() {
+ if (this.jobs.isEmpty()) {
+ return null;
+ }
+ return this.jobs.values().iterator().next();
+ }
+
+ private PmReportFilter createFilter() {
+ Collection<PmReportFilter> filterData = new ArrayList<>();
+ this.jobs.forEach((key, value) -> filterData.add((PmReportFilter) value.getFilter()));
+ return FilterFactory.createAggregateFilter(filterData);
+ }
+
+ @Override
+ public String getId() {
+ return deliveryInfo.getTopic();
+ }
+
+ @Override
+ public Iterable<Job> getJobs() {
+ return this.jobs.values();
+ }
}
private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
+ private Map<String, JobGroup> jobGroups = new HashMap<>(); // Key is Topic or JobId
private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
private final ApplicationConfig appConfig;
public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
Parameters parameters) throws ServiceException {
- if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) {
+ if ((parameters.getDeliveryInfo() != null) && !Strings.isNullOrEmpty(callbackUrl)) {
throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST);
}
AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
: restclientFactory.createRestClientNoHttpProxy(callbackUrl);
Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
this.put(job);
- synchronized (observers) {
- this.observers.forEach(obs -> obs.onJobbAdded(job));
- }
}
public void addObserver(Observer obs) {
}
}
+ private String jobGroupId(Job job) {
+ if (job.getParameters().getDeliveryInfo() == null) {
+ return job.getId();
+ } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) {
+ return job.getParameters().getDeliveryInfo().getTopic();
+ } else {
+ return job.getId();
+ }
+ }
+
private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
+ remove(job.getId());
+
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
+
+ if (job.getParameters().getFilterType() == Filter.Type.PM_DATA
+ && job.getParameters().getDeliveryInfo() != null) {
+ String jobGroupId = jobGroupId(job);
+ if (!this.jobGroups.containsKey(jobGroupId)) {
+ final JobGroupPm group = new JobGroupPm(job.getType(), job.getParameters().getDeliveryInfo());
+ this.jobGroups.put(jobGroupId, group);
+ group.add(job);
+ this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+ } else {
+ JobGroupPm group = (JobGroupPm) this.jobGroups.get(jobGroupId);
+ group.add(job);
+ }
+ } else {
+ JobGroupSingle group = new JobGroupSingle(job);
+ this.jobGroups.put(jobGroupId(job), group);
+ this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
+ }
}
public synchronized Iterable<Job> getAll() {
}
public void remove(Job job) {
+ String groupId = jobGroupId(job);
+ JobGroup group = this.jobGroups.get(groupId);
synchronized (this) {
this.allJobs.remove(job.getId());
jobsByType.remove(job.getType().getId(), job.getId());
+ group.remove(job);
+ if (group.isEmpty()) {
+ this.jobGroups.remove(groupId);
+ }
}
- notifyJobRemoved(job);
- }
- private synchronized void notifyJobRemoved(Job job) {
- this.observers.forEach(obs -> obs.onJobRemoved(job));
+ if (group.isEmpty()) {
+ this.observers.forEach(obs -> obs.onJobGroupRemoved(group));
+ }
}
public synchronized int size() {
public void clear() {
- this.allJobs.forEach((id, job) -> notifyJobRemoved(job));
+ this.jobGroups.forEach((id, group) -> this.observers.forEach(obs -> obs.onJobGroupRemoved(group)));
synchronized (this) {
allJobs.clear();
.doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
.doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
- .map(input -> new DataFromTopic(null, input.getBytes(), false))
+ .map(input -> new DataFromTopic(this.type.getId(), null, null, input.getBytes()))
.flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
.publish() //
.autoConnect();
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
public class HttpJobDataDistributor extends JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+ public HttpJobDataDistributor(JobGroup job, ApplicationConfig config) {
super(job, config);
}
@Override
protected Mono<String> sendToClient(Filter.FilteredData output) {
- Job job = this.getJob();
- logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
+ JobGroupSingle group = (JobGroupSingle) this.getJobGroup();
+ Job job = group.getJob();
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output.getValueAString());
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
return job.getConsumerRestClient().post("", output.getValueAString(), contentType);
}
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupPm;
+import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
@Getter
- private final Job job;
+ private final JobGroup jobGroup;
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
}
}
- protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+ protected JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) {
this.applConfig = applConfig;
- this.job = job;
+ this.jobGroup = jobGroup;
this.dataStore = DataStore.create(applConfig);
this.dataStore.create(DataStore.Bucket.FILES).subscribe();
this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
}
public void start(Flux<TopicListener.DataFromTopic> input) {
- logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(),
- job.getParameters().getKafkaOutputTopic());
- PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
-
+ logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
+ PmReportFilter filter = getPmReportFilter(this.jobGroup);
if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
- this.subscription = filterAndBuffer(input, this.job) //
+ this.subscription = filterAndBuffer(input, this.jobGroup) //
.flatMap(this::sendToClient) //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
- () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId()));
+ () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));
}
if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
this.dataStore.createLock(collectHistoricalDataLockName()) //
.doOnNext(isLockGranted -> {
if (isLockGranted.booleanValue()) {
- logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId());
+ logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());
} else {
logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
- this.job.getId());
+ this.jobGroup.getId());
}
}) //
.filter(isLockGranted -> isLockGranted) //
.flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
.doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
- this.job.getId())) //
+ this.jobGroup.getId())) //
.flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
.filter(this::isRopFile) //
.filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
.filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
.map(this::createFakeEvent) //
- .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
+ .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),
dataStore), 100)
- .map(job::filter) //
+ .map(jobGroup::filter) //
.map(this::gzip) //
.flatMap(this::sendToClient, 1) //
.onErrorResume(this::handleCollectHistoricalDataError) //
}
}
+ private static PmReportFilter getPmReportFilter(JobGroup jobGroup) {
+
+ if (jobGroup instanceof JobGroupPm) {
+ return ((JobGroupPm) jobGroup).getFilter();
+ } else if (jobGroup instanceof JobGroupSingle) {
+ Filter f = ((JobGroupSingle) jobGroup).getJob().getFilter();
+ return (f instanceof PmReportFilter) ? (PmReportFilter) f : null;
+ }
+ return null;
+ }
+
private Filter.FilteredData gzip(Filter.FilteredData data) {
if (this.applConfig.isZipOutput()) {
try {
gzip.flush();
gzip.close();
byte[] zipped = out.toByteArray();
- return new Filter.FilteredData(data.key, zipped, true);
+ return new Filter.FilteredData(data.infoTypeId, data.key, zipped, true);
} catch (IOException e) {
logger.error("Unexpected exception when zipping: {}", e.getMessage());
return data;
}
private Mono<String> handleCollectHistoricalDataError(Throwable t) {
- logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
+ logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId());
return tryDeleteLockFile() //
.map(bool -> "OK");
}
private String collectHistoricalDataLockName() {
- return "collectHistoricalDataLock" + this.job.getId();
+ return "collectHistoricalDataLock" + this.jobGroup.getId();
}
private TopicListener.DataFromTopic createFakeEvent(String fileName) {
NewFileEvent ev = new NewFileEvent(fileName);
- return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
+ return new TopicListener.DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes());
}
private static String fileTimePartFromRopFileName(String fileName) {
+ // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"
return fileName.substring(fileName.lastIndexOf("/") + 2);
}
private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
- // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
try {
- String fileTimePart = fileTimePartFromRopFileName(fileName);
- fileTimePart = fileTimePart.substring(0, 18);
- OffsetDateTime fileStartTime = parseFileDate(fileTimePart);
+ OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName);
OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
boolean isMatch = fileStartTime.isAfter(startTime);
logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
}
private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
- // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
if (filter.getPmRopEndTime() == null) {
return true;
}
try {
- String fileTimePart = fileTimePartFromRopFileName(fileName);
- fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
- OffsetDateTime fileEndTime = parseFileDate(fileTimePart);
+ OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName);
OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
boolean isMatch = fileEndTime.isBefore(endTime);
logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
}
}
+ private static OffsetDateTime getStartTimeFromFileName(String fileName) {
+ String fileTimePart = fileTimePartFromRopFileName(fileName);
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+ fileTimePart = fileTimePart.substring(0, 18);
+ return parseFileDate(fileTimePart);
+ }
+
+ private static OffsetDateTime getEndTimeFromFileName(String fileName) {
+ String fileTimePart = fileTimePartFromRopFileName(fileName);
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+ fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
+ return parseFileDate(fileTimePart);
+ }
+
private static OffsetDateTime parseFileDate(String timeStr) {
DateTimeFormatter startTimeFormatter =
new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
}
private void handleExceptionInStream(Throwable t) {
- logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
+ logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId());
}
protected abstract Mono<String> sendToClient(Filter.FilteredData output);
public synchronized void stop() {
if (this.subscription != null) {
- logger.debug("Stopped, job: {}", job.getId());
+ logger.debug("Stopped, job: {}", jobGroup.getId());
this.subscription.dispose();
this.subscription = null;
}
return this.subscription != null;
}
- private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+ private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, JobGroup jobGroup) {
Flux<Filter.FilteredData> filtered = //
inputFlux //
- .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) //
- .doOnNext(data -> job.getStatistics().received(data.value)) //
- .map(job::filter) //
+ .doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) //
+ .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) //
+ .map(jobGroup::filter) //
.map(this::gzip) //
.filter(f -> !f.isEmpty()) //
- .doOnNext(f -> job.getStatistics().filtered(f.value)) //
- .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); //
+ .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) //
+ .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) //
+ ; //
+ Job job = jobGroup.getJobs().iterator().next();
if (job.isBuffered()) {
filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes()));
+ .map(buffered -> new Filter.FilteredData(this.getJobGroup().getType().getId(), null,
+ buffered.toString().getBytes()));
}
return filtered;
}
}
private Mono<String> handleError(Throwable t) {
- logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+ logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId());
this.errorStats.handleException(t);
return Mono.empty(); // Ignore
}
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
-import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private KafkaSender<byte[], byte[]> sender;
- public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
- super(job, appConfig);
+ public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) {
+ super(jobGroup, appConfig);
- SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
+ SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig, jobGroup.getDeliveryInfo());
this.sender = KafkaSender.create(senderOptions);
}
@Override
protected Mono<String> sendToClient(Filter.FilteredData data) {
- Job job = this.getJob();
- SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, job);
+
+ SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo());
logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
- job.getParameters().getKafkaOutputTopic());
+ this.getJobGroup().getDeliveryInfo());
return this.sender.send(Mono.just(senderRecord)) //
- .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) //
- .doOnError(
- t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) //
+ .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) //
+ .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
+ t.getMessage())) //
.onErrorResume(t -> Mono.empty()) //
.collectList() //
.map(x -> "ok");
}
}
- private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config) {
- String bootstrapServers = config.getKafkaBootStrapServers();
+ private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config,
+ KafkaDeliveryInfo deliveryInfo) {
+
+ String bootstrapServers = deliveryInfo.getBootStrapServers();
+ if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+ bootstrapServers = config.getKafkaBootStrapServers();
+ }
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return SenderOptions.create(props);
}
- private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
+ private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output,
+ KafkaDeliveryInfo deliveryInfo) {
int correlationMetadata = 2;
- String topic = infoJob.getParameters().getKafkaOutputTopic();
- var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
+ var producerRecord =
+ new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers());
return SenderRecord.create(producerRecord, correlationMetadata);
}
.doFinally(
sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
.filter(t -> t.value().length > 0 || t.key().length > 0) //
- .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
+ .map(input -> new DataFromTopic(this.type.getId(), input.headers(), input.key(), input.value())) //
.flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) //
.publish() //
.autoConnect(1);
logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic());
return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
.map(bytes -> unzip(bytes, ev.getFilename())) //
- .map(bytes -> new DataFromTopic(data.key, bytes, false));
+ .map(bytes -> new DataFromTopic(data.infoTypeId, data.headers, data.key, bytes));
} catch (Exception e) {
return Mono.just(data);
public static class DataFromTopic {
public final byte[] key;
public final byte[] value;
- public final boolean isZipped;
+
+ public final String infoTypeId;
+
+ public final Iterable<Header> headers;
private static byte[] noBytes = new byte[0];
@ToString.Exclude
private PmReport cachedPmReport;
- public DataFromTopic(byte[] key, byte[] value, boolean isZipped) {
+ public DataFromTopic(String typeId, Iterable<Header> headers, byte[] key, byte[] value) {
this.key = key == null ? noBytes : key;
this.value = value == null ? noBytes : value;
- this.isZipped = isZipped;
+ this.infoTypeId = typeId;
+ this.headers = headers;
}
public String valueAsString() {
return new String(this.value);
}
- public static final String ZIP_PROPERTY = "gzip";
+ public static final String ZIPPED_PROPERTY = "gzip";
+ public static final String TYPE_ID_PROPERTY = "type-id";
- public static boolean findZipped(Iterable<Header> headers) {
+ public boolean isZipped() {
if (headers == null) {
return false;
}
for (Header h : headers) {
- if (h.key().equals(ZIP_PROPERTY)) {
+ if (h.key().equals(ZIPPED_PROPERTY)) {
return true;
}
}
return false;
}
+
+ public String getTypeIdFromHeaders() {
+ if (headers == null) {
+ return "";
+ }
+ for (Header h : headers) {
+ if (h.key().equals(TYPE_ID_PROPERTY)) {
+ return new String(h.value());
+ }
+ }
+ return "";
+ }
}
public Flux<DataFromTopic> getFlux();
import lombok.Getter;
-import org.apache.logging.log4j.util.Strings;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.Jobs.JobGroup;
import org.oran.dmaapadapter.repository.MultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
jobs.addObserver(new Jobs.Observer() {
@Override
- public void onJobbAdded(Job job) {
- addJob(job);
+ public void onJobbGroupAdded(JobGroup jobGroup) {
+ addJob(jobGroup);
}
@Override
- public void onJobRemoved(Job job) {
- removeJob(job);
+ public void onJobGroupRemoved(JobGroup jobGroup) {
+ removeDistributor(jobGroup);
}
});
}
- public synchronized void addJob(Job job) {
- removeJob(job);
- logger.debug("Job added {}", job.getId());
- if (job.getType().isKafkaTopicDefined()) {
- addConsumer(job, dataDistributors, kafkaTopicListeners);
+ public synchronized void addJob(JobGroup jobGroup) {
+ removeDistributor(jobGroup);
+ logger.debug("Job added {}", jobGroup.getId());
+ if (jobGroup.getType().isKafkaTopicDefined()) {
+ addDistributor(jobGroup, dataDistributors, kafkaTopicListeners);
}
- if (job.getType().isDmaapTopicDefined()) {
- addConsumer(job, dataDistributors, dmaapTopicListeners);
+ if (jobGroup.getType().isDmaapTopicDefined()) {
+ addDistributor(jobGroup, dataDistributors, dmaapTopicListeners);
}
}
- private JobDataDistributor createConsumer(Job job) {
- return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
- : new HttpJobDataDistributor(job, appConfig);
+ private JobDataDistributor createDistributor(JobGroup jobGroup) {
+ return jobGroup.getDeliveryInfo() != null ? new KafkaJobDataDistributor(jobGroup, appConfig)
+ : new HttpJobDataDistributor(jobGroup, appConfig);
}
- private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+ private void addDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors,
Map<String, TopicListener> topicListeners) {
- TopicListener topicListener = topicListeners.get(job.getType().getId());
- JobDataDistributor distributor = createConsumer(job);
+ TopicListener topicListener = topicListeners.get(jobGroup.getType().getId());
+ JobDataDistributor distributor = createDistributor(jobGroup);
distributor.start(topicListener.getFlux());
- distributors.put(job.getType().getId(), job.getId(), distributor);
+ distributors.put(jobGroup.getType().getId(), jobGroup.getId(), distributor);
}
- public synchronized void removeJob(Job job) {
- removeJob(job, dataDistributors);
+ public synchronized void removeDistributor(JobGroup jobGroup) {
+ removeDistributor(jobGroup, dataDistributors);
}
- private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
- JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+ private static void removeDistributor(JobGroup jobGroup, MultiMap<JobDataDistributor> distributors) {
+ JobDataDistributor distributor = distributors.remove(jobGroup.getType().getId(), jobGroup.getId());
if (distributor != null) {
- logger.debug("Job removed {}", job.getId());
+ logger.debug("Job removed {}", jobGroup.getId());
distributor.stop();
}
}
"type": "integer",
"minimum": 1
},
- "kafkaOutputTopic" : {
- "type": "string"
+ "deliveryInfo": {
+ "type": "object",
+ "properties": {
+ "topic": {
+ "type": "string"
+ },
+ "bootStrapServers": {
+ "type": "string"
+ }
+ },
+ "required": [
+ "topic"
+ ]
},
"bufferTimeout": {
"type": "object",
"type": "integer",
"minimum": 1
},
- "maxTimeMiliseconds": {
+ "maxTimeMilliseconds": {
"type": "integer",
"minimum": 0,
"maximum": 160000
"additionalProperties": false,
"required": [
"maxSize",
- "maxTimeMiliseconds"
+ "maxTimeMilliseconds"
]
}
},
"additionalProperties": false
-}
+}
\ No newline at end of file
}
]
},
- "measObjClass": {
+ "measTypeSpecs": {
"type": "array",
"items": [
{
- "type": "string"
- }
- ]
- },
- "measTypes": {
- "type": "array",
- "items": [
- {
- "type": "string"
+ "type": "object",
+ "properties": {
+ "measuredObjClass": {
+ "type": "string"
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ }
+ },
+ "required": [
+ "measuredObjClass"
+ ]
}
]
},
"pmdata"
]
},
- "kafkaOutputTopic": {
- "type": "string"
+ "deliveryInfo": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "topic": {
+ "type": "string"
+ },
+ "bootStrapServers": {
+ "type": "string"
+ }
+ },
+ "required": [
+ "topic"
+ ]
}
- }
+ },
+ "required": [
+ "filter",
+ "filterType"
+ ]
}
\ No newline at end of file
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+@TestMethodOrder(MethodOrderer.MethodName.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
@Autowired
private SecurityContext securityContext;
- private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
@LocalServerPort
int localServerHttpPort;
// Create a job with a PM filter
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
// Create a job with a PM filter
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.addMeasTypes("ManagedElement", "succImmediateAssignProcs");
Job.Parameters param =
Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
String stats = restClient().get(targetUri).block();
assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+ }
+
+ @Test
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ void testZZActuator() throws Exception {
+ // The test must be run last, hence the "ZZ" in the name. All succeeding tests
+ // will fail.
+ AsyncRestClient client = restClient();
+ client.post("/actuator/loggers/org.oran.dmaapadapter", "{\"configuredLevel\":\"trace\"}").block();
+ String resp = client.get("/actuator/loggers/org.oran.dmaapadapter").block();
+ assertThat(resp).contains("TRACE");
+ client.post("/actuator/loggers/org.springframework.boot.actuate", "{\"configuredLevel\":\"trace\"}").block();
+ // This will stop the web server and all coming tests will fail.
+ client.post("/actuator/shutdown", "").block();
+ Thread.sleep(1000);
+ String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
+ StepVerifier.create(restClient().get(url)) // Any call
+ .expectSubscription() //
+ .expectErrorMatches(t -> t instanceof WebClientRequestException) //
+ .verify();
}
public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
public static class TestResults {
- public List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
+ public final List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
- public List<Map<String, String>> receivedHeaders =
+ public final List<Map<String, String>> receivedHeaders =
Collections.synchronizedList(new ArrayList<Map<String, String>>());
public TestResults() {}
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
ConsumerController.TestResults results = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2));
assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
- assertThat(results.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
deleteInformationJobInIcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
+ private String pmJobParameters() {
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+
+ filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
+ filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
+ filterData.getSourceNames().add("O-DU-1122");
+ filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
+ Job.Parameters param =
+ Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
+
+ return gson.toJson(param);
+ }
+
@Test
void testPmFilter() throws Exception {
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "PmDataOverRest";
- String jsonStr =
- reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }");
+ String jsonStr = pmJobParameters();
ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), "");
createInformationJobInIcs(DMAAP_JOB_ID, jobInfo);
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- String path = "./src/test/resources/pm_report.json";
- String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- DmaapSimulatorController.addPmResponse(pmReportJson);
-
- ConsumerController.TestResults results = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(1));
-
- String filtered = results.receivedBodies.get(0);
- assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("attTCHSeizures");
-
- logger.info(filtered);
-
deleteInformationJobInIcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
}
private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
- if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) {
- logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped,
+ if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
+ logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
this.applicationConfig.isZipOutput());
}
- if (!receivedKafkaOutput.isZipped) {
+
+ if (!receivedKafkaOutput.isZipped()) {
return receivedKafkaOutput;
}
try {
byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value);
- return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false);
+ return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
} catch (IOException e) {
logger.error("********* ERROR ", e.getMessage());
return null;
private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
- logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+ if (logger.isDebugEnabled()) {
+ logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
+ logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+ }
}
synchronized String lastKey() {
}
void reset() {
- this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false);
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
this.count = 0;
}
}
return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
}
- private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize) {
- Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
+ private static Object jobParametersAsJsonObject(String filter, long maxTimeMilliseconds, int maxSize) {
+ Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMilliseconds) : null;
Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE)
.bufferTimeout(buffer).build();
ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
try {
- Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
- .kafkaOutputTopic(topic).build();
+ Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+ .topic(topic) //
+ .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+ .build();
+ Job.Parameters param = Job.Parameters.builder() //
+ .filter(filterData) //
+ .filterType(Job.Parameters.PM_FILTER_TYPE).deliveryInfo(deliveryInfo) //
+ .build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
}
}
- ConsumerJobInfo consumerJobInfoKafka(String topic) {
+ private ConsumerJobInfo consumerJobInfoKafka(String topic) {
try {
- Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
+ Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+ .topic(topic) //
+ .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+ .build();
+ Job.Parameters param = Job.Parameters.builder().deliveryInfo(deliveryInfo).build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
+
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
waitForKafkaListener();
var dataToSend = Flux.range(1, 3).map(i -> kafkaSenderRecord("Message_" + i, "", KAFKA_TYPE_ID)); // Message_1,
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("pmCounterNumber0");
- filterData.getMeasObjClass().add("NRCellCU");
+
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
restClient());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- final int NO_OF_COUNTERS = 5;
+ final int NO_OF_COUNTERS = 2;
for (int i = 0; i < NO_OF_COUNTERS; ++i) {
- filterData.getMeasTypes().add("pmCounterNumber" + i);
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
}
- filterData.getMeasObjClass().add("NRCellCU");
final int NO_OF_JOBS = 150;
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
- void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception {
+ void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
// Filter PM reports and sent to many jobs over Kafka
- this.applicationConfig.setZipOutput(true);
+ this.applicationConfig.setZipOutput(false);
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- final int NO_OF_COUNTERS = 0;
- for (int i = 0; i < NO_OF_COUNTERS; ++i) {
- filterData.getMeasTypes().add("pmCounterNumber" + i);
- }
- filterData.getMeasObjClass().add("NRCellCU");
+ final int NO_OF_JOBS = 150;
+ ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+ for (int i = 0; i < NO_OF_JOBS; ++i) {
+ final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
+ final String jobId = "manyJobs_" + i;
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
- final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
- final int NO_OF_RECEIVERS = 150;
- ArrayList<KafkaReceiver> receivers = new ArrayList<>();
- for (int i = 0; i < NO_OF_RECEIVERS; ++i) {
KafkaReceiver receiver =
new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
receivers.add(receiver);
}
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
final int NO_OF_OBJECTS = 1000;
class JsltFilterTest {
private String filterReport(JsltFilter filter) throws Exception {
- DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+ DataFromTopic data = new DataFromTopic("type", null, null, loadReport().getBytes());
FilteredData filtered = filter.filter(data);
return filtered.getValueAString();
}
void testJsonPath() throws Exception {
String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
JsonPathFilter filter = new JsonPathFilter(exp);
- DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
+ DataFromTopic data = new DataFromTopic("typeId", null, null, loadReport().getBytes());
FilteredData filtered = filter.filter(data);
String res = filtered.getValueAString();
assertThat(res).isEqualTo("\"attTCHSeizures\"");
private String filterReport(PmReportFilter filter) throws Exception {
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+ TopicListener.DataFromTopic data =
+ new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
FilteredData filtered = filter.filter(data);
String reportAfterFilter = gson.toJson(data.getCachedPmReport());
void testPmFilterMeasTypes() throws Exception {
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.measTypes.add("succImmediateAssignProcs");
+ filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
PmReportFilter filter = new PmReportFilter(filterData);
String filtered = filterReport(filter);
// Test that no report is returned if not meas types were found
filterData = new PmReportFilter.FilterData();
- filterData.measTypes.add("junk");
+ filterData.addMeasTypes("junk", "succImmediateAssignProcs");
+
filter = new PmReportFilter(filterData);
filtered = filterReport(filter);
assertThat(filtered).isEmpty();
void testMeasObjClass() throws Exception {
{
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.measObjClass.add("junk");
+ filterData.addMeasTypes("junk");
PmReportFilter filter = new PmReportFilter(filterData);
String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
}
{
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
+ TopicListener.DataFromTopic data =
+ new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
- utranCellFilter.measObjClass.add("UtranCell");
+ utranCellFilter.addMeasTypes("UtranCell");
FilteredData filtered = new PmReportFilter(utranCellFilter).filter(data);
assertThat(filtered.getValueAString()).contains("UtranCell").doesNotContain("ENodeBFunction");
PmReportFilter.FilterData eNodeBFilter = new PmReportFilter.FilterData();
- eNodeBFilter.measObjClass.add("ENodeBFunction");
+ eNodeBFilter.addMeasTypes("ENodeBFunction");
filtered = new PmReportFilter(eNodeBFilter).filter(data);
assertThat(filtered.getValueAString()).contains("ENodeBFunction").doesNotContain("UtranCell");
}
{
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("pmCounterNumber0");
- filterData.getMeasObjClass().add("NRCellCU");
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
PmReportFilter filter = new PmReportFilter(filterData);
- DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false);
+ DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes());
Instant startTime = Instant.now();
for (int i = 0; i < TIMES; ++i) {
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
PmReportFilter filter = new PmReportFilter(filterData);
- FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false));
+ FilteredData filtered = filter.filter(new TopicListener.DataFromTopic("typeId", null, null, "junk".getBytes()));
assertThat(filtered.isEmpty()).isTrue();
- filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false));
+ filtered = filter
+ .filter(new TopicListener.DataFromTopic("typeId", null, null, reQuote("{'msg': 'test'}").getBytes()));
assertThat(filtered.isEmpty()).isTrue();
}
"event": {
"commonEventHeader": {
"domain": "perf3gpp",
- "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882",
- "sequence": 0,
"eventName": "perf3gpp_gnb-Ericsson_pmMeasResult",
"sourceName": "O-DU-1122",
"reportingEntityName": "",
- "priority": "Normal",
"startEpochMicrosec": 951912000000,
"lastEpochMicrosec": 951912900000,
- "version": "4.0",
- "vesEventListenerVersion": "7.1",
"timeZoneOffset": "+00:00"
},
"perf3gppFields": {