From: ychacon Date: Fri, 10 Feb 2023 12:46:33 +0000 (+0100) Subject: Updates for G Maintenance release X-Git-Tag: 1.2.1 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=fb640b4ef01b0e2508f3b6fcdd83f56cf13a747b;p=nonrtric%2Fplt%2Fdmaapadapter.git Updates for G Maintenance release Minor changes. Step parent pom version DMaaP adapter, updated job definition syntax DMaaP adapter, support for shared Kafka topics Issue-ID: NONRTRIC-838 Signed-off-by: ychacon Change-Id: Ia6a909a07304c79a0c2d53cdea0f857ebae7f96a --- diff --git a/README.md b/README.md index 686135a..48f2f20 100644 --- a/README.md +++ b/README.md @@ -55,14 +55,14 @@ When an Information Job is created in the Information Coordinator Service Consum "maxSize": { "type": "integer" }, - "maxTimeMiliseconds": { + "maxTimeMilliseconds": { "type": "integer" } }, "additionalProperties": false, "required": [ "maxSize", - "maxTimeMiliseconds" + "maxTimeMilliseconds" ] } }, @@ -79,7 +79,7 @@ When an Information Job is created in the Information Coordinator Service Consum Object1 and Object2 may be posted in one call --> ["Object1", "Object2"] The bufferTimeout is a Json object and the parameters in the object are: - maxSize the maximum number of buffered objects before posting - - maxTimeMiliseconds the maximum delay time to buffer before posting + - maxTimeMilliseconds the maximum delay time to buffer before posting If no bufferTimeout is specified, each object will be posted as received in separate calls (not quoted and put in a Json array). diff --git a/api/api.json b/api/api.json index 3ec5a4d..6ba7582 100644 --- a/api/api.json +++ b/api/api.json @@ -296,6 +296,15 @@ }}, "tags": ["Actuator"] }}, + "/actuator/shutdown": {"post": { + "summary": "Actuator web endpoint 'shutdown'", + "operationId": "shutdown_2", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, "/data-producer/v1/info-producers/{infoProducerId}": { "get": { "operationId": "getInfoProducer", diff --git a/api/api.yaml b/api/api.yaml index b2eaf15..024b43f 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -174,6 +174,19 @@ paths: '*/*': schema: type: object + /actuator/shutdown: + post: + tags: + - Actuator + summary: Actuator web endpoint 'shutdown' + operationId: shutdown_2 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object /data-producer/v1/info-producers/{infoProducerId}: get: tags: diff --git a/config/application.yaml b/config/application.yaml index 8975849..57c6e31 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -27,7 +27,12 @@ management: web: exposure: # Enabling of springboot actuator features. See springboot documentation. - include: "loggers,logfile,health,info,metrics,threaddump,heapdump" + include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown" + endpoint: + shutdown: + enabled: true +lifecycle: + timeout-per-shutdown-phase: "20s" springdoc: show-actuator: true logging: @@ -48,14 +53,15 @@ logging: server: # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework. # See springboot documentation. - port : 8435 - http-port: 8084 - ssl: - key-store-type: JKS - key-store-password: policy_agent - key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks - key-password: policy_agent - key-alias: policy_agent + port : 8435 + http-port: 8084 + ssl: + key-store-type: JKS + key-store-password: policy_agent + key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks + key-password: policy_agent + key-alias: policy_agent + shutdown: "graceful" app: webclient: # Configuration of the trust store used for the HTTP client (outgoing requests) diff --git a/docs/Pictures.pptx b/docs/Pictures.pptx index e2cf47e..2ec5a51 100644 Binary files a/docs/Pictures.pptx and b/docs/Pictures.pptx differ diff --git a/docs/dedicatedTopics.png b/docs/dedicatedTopics.png new file mode 100644 index 0000000..0b91b75 Binary files /dev/null and b/docs/dedicatedTopics.png differ diff --git a/docs/overview.rst b/docs/overview.rst index 1174e44..88a2606 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -116,7 +116,10 @@ typeSchema.json =============== This schema will by default be registerred for the type. The following properties are defined: -* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. +* outputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. This consists of the following properties: + + * topic, the name of the kafka topic + * bootStrapServers, reference to the kafka bus to used. This is optional, if this is omitted the default configured kafka bus is used (which is configured in the application.yaml file). * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt". @@ -128,7 +131,7 @@ This schema will by default be registerred for the type. The following propertie * bufferTimeout can be used to buffer several json objects received from Kafka when kafkaInputTopic is defined into one json array. If bufferTimeout is used, the delivered data will be a Json array of the objects received. If not, each received object will be delivered in a separate call. This contains: * maxSize, the maximum number of objects to collect before delivery to the consumer - * maxTimeMiliseconds, the maximum time to delay delivery (to buffer). + * maxTimeMilliseconds, the maximum time to delay delivery (to buffer). * maxConcurrency, defines max how many paralell REST calls the consumer wishes to receive. 1, which is default, means sequential. A higher value may increase throughput. @@ -165,7 +168,7 @@ Below follows an example of using bufferTimeout and maxConcurrency. { "bufferTimeout":{ "maxSize":123, - "maxTimeMiliseconds":456 + "maxTimeMilliseconds":456 }, "maxConcurrency":1 } @@ -179,18 +182,24 @@ This schema will be registered when the configured dataType is "pmData". This will extend the filtering capabilities so that a special filter for PM data can be used. Here it is possible to define which meas-types (counters) to get from which resources. +"It is possible to both filter on new data that is collected from the traffical nodes and to query from data that is already collected. + The filterType parameter is extended to allow value "pmdata" which can be used for PM data filtering. * sourceNames an array of source names for wanted PM reports. * measObjInstIds an array of meas object instances for wanted PM reports. If a given filter value is contained in the filter definition, it will match (partial matching). For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32". -* measTypes selects the meas types to get +* measTypeSpecs selects the meas types to get. This consist of: + + * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8). + * measTypes the name of the measurement types (connected to the measObjClass). * measuredEntityDns partial match of meas entity DNs. -* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8). Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction". +* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned. + In this case, the query is only done for files from the given "sourceNames". + If this parameter is excluded, only "new" reports will be delivered as they are collected from the traffical nodes. -All PM filter properties are optional and a non given will result in "match all". -The result of the filtering is still following the structure of a 3GPP PM report. +All PM filter properties are optional and a non given will result in "match all" (except the pmRopStartTime). Below follows an example of a PM filter. @@ -205,11 +214,144 @@ Below follows an example of a PM filter. "measObjInstIds":[ "UtranCell=dGbg-997" ], - "measTypes":[ - "succImmediateAssignProcs" - ], + "measTypeSpecs":[ + { + "measuredObjClass":"UtranCell", + "measTypes":[ + "succImmediateAssignProcs" + ] + } + ], "measuredEntityDns":[ "ManagedElement=RNC-Gbg-1" - ] + ], + "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00" + } + } + + +Here is an example of a filter that will +match two counters from all cells in two traffical nodes. + +.. code-block:: javascript + + { + "filterType":"pmdata", + "filter": { + "sourceNames":[ + "O-DU-1122", "O-DU-1123" + ], + "measTypeSpecs":[ + { + "measuredObjClass":"NRCellCU", + "measTypes":[ + "pmCounterNumber0", "pmCounterNumber1" + ] + } + ], + } } + + +******************** +Bulk PM subscription +******************** + +The sequence is that a "new file event" is received (from a Kafka topic). +The file is read from local storage (file storage or S3 object store). For each Job, the specified PM filter is applied to the data +and the result is sent to the Kafka topic specified by the Job (by the data consumer). + +.. image:: ./dedicatedTopics.png + :width: 500pt + +The result of the PM filtering is still following the structure of a 3GPP PM report. +Here follows an example of a resulting delivered PM report. + +.. code-block:: javascript + + { + "event":{ + "commonEventHeader":{ + "domain":"perf3gpp", + "eventId":"9efa1210-f285-455f-9c6a-3a659b1f1882", + "eventName":"perf3gpp_gnb-Ericsson_pmMeasResult", + "sourceName":"O-DU-1122", + "reportingEntityName":"", + "startEpochMicrosec":951912000000, + "lastEpochMicrosec":951912900000, + "timeZoneOffset":"+00:00" + }, + "perf3gppFields":{ + "perf3gppFieldsVersion":"1.0", + "measDataCollection":{ + "granularityPeriod":900, + "measuredEntityUserName":"RNC Telecomville", + "measuredEntityDn":"SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1", + "measuredEntitySoftwareVersion":"", + "measInfoList":[ + { + "measInfoId":{ + "sMeasInfoId":"" + }, + "measTypes":{ + "map":{ + "succImmediateAssignProcs":1 + }, + "sMeasTypesList":[ + "succImmediateAssignProcs" + ] + }, + "measValuesList":[ + { + "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-997", + "suspectFlag":"false", + "measResults":[ + { + "p":1, + "sValue":"1113" + } + ] + }, + { + "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-998", + "suspectFlag":"false", + "measResults":[ + { + "p":1, + "sValue":"234" + } + ] + }, + { + "measObjInstId":"RncFunction=RF-1,UtranCell=Gbg-999", + "suspectFlag":"true", + "measResults":[ + { + "p":1, + "sValue":"789" + } + ] + } + ] + } + ] + } + } + } + } + +If several jobs publish to the same Kafka topic (shared topic), the resulting filtered output will be an aggregate of all matching filters. +So, each consumer will then get more data than requested. + +.. image:: ./sharedTopics.png + :width: 500pt + +================== +Sent Kafka headers +================== + +For each filtered result sent to a Kafka topic, there will the following proerties in the Kafa header: + +* type-id, this propery is used to indicate the ID of the information type. The value is a string. +* gzip, if this property exists the object is gzipped (otherwise not). The property has no value. \ No newline at end of file diff --git a/docs/sharedTopics.png b/docs/sharedTopics.png new file mode 100644 index 0000000..55668c4 Binary files /dev/null and b/docs/sharedTopics.png differ diff --git a/pom.xml b/pom.xml index 91319ff..981578b 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.springframework.boot spring-boot-starter-parent - 2.5.14 + 2.7.8 org.o-ran-sc.nonrtric.plt @@ -57,7 +57,7 @@ 3.0.11 0.30.0 3.7.0.1746 - 0.8.5 + 0.8.8 true 4.0.0-rc-2 1.4 @@ -208,7 +208,7 @@ software.amazon.awssdk s3 2.17.292 - + @@ -384,4 +384,4 @@ JIRA https://jira.o-ran-sc.org/ - + \ No newline at end of file diff --git a/src/main/java/org/oran/dmaapadapter/Application.java b/src/main/java/org/oran/dmaapadapter/Application.java index 2058202..7201d4f 100644 --- a/src/main/java/org/oran/dmaapadapter/Application.java +++ b/src/main/java/org/oran/dmaapadapter/Application.java @@ -48,6 +48,14 @@ public class Application { public static void main(String[] args) { applicationContext = SpringApplication.run(Application.class); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + logger.warn("Shutting down, received signal SIGTERM"); + SpringApplication.exit(applicationContext); + applicationContext = null; + } + }); } @Scheduled(fixedRate = 10 * 1000) diff --git a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index a6e2444..3ba96ab 100644 --- a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -118,6 +118,22 @@ public class AsyncRestClient { .map(this::toBody); } + public Mono> postForEntity(String uri, @Nullable String body) { + Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(request); + } + + public Mono post(String uri, @Nullable String body) { + return postForEntity(uri, body) // + .map(this::toBody); + } + private Mono> retrieve(RequestHeadersSpec request) { if (securityContext.isConfigured()) { request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken())); diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index b53383a..d7561da 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -162,7 +162,7 @@ public class ApplicationConfig { } public Collection getTypes() { - com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); try { String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset()); ConfigFile configData = gson.fromJson(configJson, ConfigFile.class); diff --git a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index 1816dc3..c118da3 100644 --- a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -97,7 +97,7 @@ public class ProducerCallbacksController { logger.warn("jobCreatedCallback failed: {}", e.getMessage()); return ErrorResponse.create(e, e.getHttpStatus()); } catch (Exception e) { - logger.warn("jobCreatedCallback failed: {}", e.getMessage()); + logger.warn("jobCreatedCallback failed: {}", e.getMessage(), e); return ErrorResponse.create(e, HttpStatus.BAD_REQUEST); } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index 6c5fa25..e327fb6 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -39,24 +39,26 @@ public interface Filter { public static class FilteredData { public final byte[] key; public final byte[] value; + public final String infoTypeId; @Getter private final boolean isZipped; - private static final FilteredData emptyData = new FilteredData(null, null); + private static final FilteredData emptyData = new FilteredData(null, null, null); public boolean isEmpty() { return (key == null || key.length == 0) && (value == null || value.length == 0); } - public FilteredData(byte[] key, byte[] value) { - this(key, value, false); + public FilteredData(String type, byte[] key, byte[] value) { + this(type, key, value, false); } - public FilteredData(byte[] key, byte[] value, boolean isZipped) { + public FilteredData(String type, byte[] key, byte[] value, boolean isZipped) { this.key = key; this.value = value; this.isZipped = isZipped; + this.infoTypeId = type; } public String getValueAString() { @@ -70,9 +72,9 @@ public interface Filter { public Iterable
headers() { ArrayList
result = new ArrayList<>(); if (isZipped()) { - Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null); - result.add(h); + result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null)); } + result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes())); return result; } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java b/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java index 8d51727..358c093 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java +++ b/src/main/java/org/oran/dmaapadapter/filter/FilterFactory.java @@ -23,6 +23,7 @@ package org.oran.dmaapadapter.filter; import com.google.gson.GsonBuilder; import java.lang.invoke.MethodHandles; +import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,14 @@ public class FilterFactory { } } + public static PmReportFilter createAggregateFilter(Collection filters) { + PmReportFilter.FilterData resultFilterData = filters.iterator().next().getFilterData(); + for (PmReportFilter filter : filters) { + resultFilterData.addAll(filter.getFilterData()); + } + return new PmReportFilter(resultFilterData); + } + private static PmReportFilter.FilterData createPmFilterData(Object filter) { String str = gson.toJson(filter); return gson.fromJson(str, PmReportFilter.FilterData.class); diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java index 577f83d..1720517 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java @@ -49,7 +49,7 @@ class JsltFilter implements Filter { @Override public FilteredData filter(DataFromTopic data) { if (expression == null) { - return new FilteredData(data.key, data.value); + return new FilteredData(data.infoTypeId, data.key, data.value); } try { JsonFactory factory = mapper.getFactory(); @@ -60,7 +60,7 @@ class JsltFilter implements Filter { if (filteredNode == NullNode.instance) { return FilteredData.empty(); } - return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode)); + return new FilteredData(data.infoTypeId, data.key, mapper.writeValueAsBytes(filteredNode)); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java index 98f1b46..8c397b0 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java @@ -46,7 +46,7 @@ class JsonPathFilter implements Filter { String str = new String(data.value); Object o = JsonPath.parse(str).read(this.expression, Object.class); String json = gson.toJson(o); - return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes()); + return o == null ? FilteredData.empty() : new FilteredData(data.infoTypeId, data.key, json.getBytes()); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java index 46c90a1..3182d7b 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java @@ -42,9 +42,6 @@ public class PmReport { @Expose private String eventId; - @Expose - private int sequence; - @Expose private String eventName; @@ -55,23 +52,20 @@ public class PmReport { @Expose private String reportingEntityName; - @Expose - private String priority; - @Expose private long startEpochMicrosec; @Expose private long lastEpochMicrosec; - @Expose - private String version; - - @Expose - private String vesEventListenerVersion; - @Expose private String timeZoneOffset; + + /* Not reported elements */ + int sequence; + String priority; + String version; + String vesEventListenerVersion; } public static class MeasInfoId { diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index a545e90..db1a018 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; import lombok.Getter; import lombok.Setter; @@ -61,17 +62,98 @@ public class PmReportFilter implements Filter { @Getter public static class FilterData { - final Collection sourceNames = new HashSet<>(); - final Collection measObjInstIds = new ArrayList<>(); - final Collection measTypes = new HashSet<>(); - final Collection measuredEntityDns = new ArrayList<>(); - final Collection measObjClass = new HashSet<>(); + + public static class MeasTypeSpec { + static MeasTypeSpec empty = new MeasTypeSpec(); + + static MeasTypeSpec empty() { + return empty; + } + + @Getter + String measuredObjClass; + + @Getter + final Set measTypes = new HashSet<>(); + + @Override + public boolean equals(Object obj) { + return measuredObjClass.equals(obj); + } + + @Override + public int hashCode() { + return measuredObjClass.hashCode(); + } + } + + final Set sourceNames = new HashSet<>(); + final Set measObjInstIds = new HashSet<>(); + final Collection measTypeSpecs = new ArrayList<>(); + final Set measuredEntityDns = new HashSet<>(); + + public void addMeasTypes(String measObjClass, String... measTypes) { + MeasTypeSpec spec = this.findMeasTypeSpec(measObjClass); + if (spec == null) { + spec = new MeasTypeSpec(); + spec.measuredObjClass = measObjClass; + this.measTypeSpecs.add(spec); + } + for (String measType : measTypes) { + spec.measTypes.add(measType); + } + } + + public void addMeasTypes(String measObjClass, Collection measTypes) { + for (String measType : measTypes) { + addMeasTypes(measObjClass, measType); + } + } @Setter String pmRopStartTime; @Setter String pmRopEndTime; + + public void addAll(FilterData other) { + addAll(other.sourceNames, sourceNames); + addAll(other.measObjInstIds, measObjInstIds); + addAll(other.measTypeSpecs); + addAll(other.measuredEntityDns, measuredEntityDns); + } + + public MeasTypeSpec getMeasTypeSpec(String measuredObjClass) { + if (measTypeSpecs.isEmpty()) { + return MeasTypeSpec.empty(); + } + return findMeasTypeSpec(measuredObjClass); + } + + private MeasTypeSpec findMeasTypeSpec(String measuredObjClass) { + for (MeasTypeSpec t : this.measTypeSpecs) { + if (t.measuredObjClass.equals(measuredObjClass)) { + return t; + } + } + return null; + } + + private void addAll(Collection measTypes) { + for (MeasTypeSpec s : measTypes) { + addMeasTypes(s.getMeasuredObjClass(), s.getMeasTypes()); + } + } + + private void addAll(Set source, Set dst) { + if (source.isEmpty()) { + dst.clear(); + } else if (dst.isEmpty()) { + // Nothing, this means 'match all' + } else { + dst.addAll(source); + } + } } private static class MeasTypesIndexed extends PmReport.MeasTypes { @@ -108,7 +190,7 @@ public class PmReportFilter implements Filter { if (reportFiltered == null) { return FilteredData.empty(); } - return new FilteredData(data.key, gson.toJson(reportFiltered).getBytes()); + return new FilteredData(data.infoTypeId, data.key, gson.toJson(reportFiltered).getBytes()); } catch (Exception e) { logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage()); return FilteredData.empty(); @@ -166,17 +248,18 @@ public class PmReportFilter implements Filter { return false; } - private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, FilterData filter) { + private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, + FilterData.MeasTypeSpec measTypesSpec) { String measType = measTypes.getMeasType(measResult.getP()); - return filter.measTypes.isEmpty() || filter.measTypes.contains(measType); + return measTypesSpec.measTypes.isEmpty() || measTypesSpec.measTypes.contains(measType); } private Collection createMeasResults(Collection oldMeasResults, MeasTypes measTypes, - FilterData filter) { + FilterData.MeasTypeSpec measTypesSpec) { Collection newMeasResults = new ArrayList<>(); for (MeasResult measResult : oldMeasResults) { - if (isMeasResultMatch(measResult, measTypes, filter)) { + if (isMeasResultMatch(measResult, measTypes, measTypesSpec)) { newMeasResults.add(measResult.toBuilder().build()); } } @@ -199,27 +282,25 @@ public class PmReportFilter implements Filter { return distinguishedName.substring(lastRdn + 1, lastEqualChar); } - private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) { - if (filter.measObjClass.isEmpty()) { - return true; - } - + private FilterData.MeasTypeSpec getMeasTypeSpec(String measObjInstId, FilterData filter) { String measObjClass = managedObjectClass(measObjInstId); - return filter.measObjClass.contains(measObjClass); + return filter.getMeasTypeSpec(measObjClass); } private MeasValuesList createMeasValuesList(MeasValuesList oldMeasValues, MeasTypes measTypes, FilterData filter) { + FilterData.MeasTypeSpec measTypesSpec = getMeasTypeSpec(oldMeasValues.getMeasObjInstId(), filter); + if (measTypesSpec == null) { + return MeasValuesList.empty(); + } - if (isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter) - && isMeasInstClassMatch(oldMeasValues.getMeasObjInstId(), filter)) { - - Collection newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, filter); - return oldMeasValues.toBuilder() // - .measResults(newResults) // - .build(); - } else { + if (!isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)) { return MeasValuesList.empty(); } + + Collection newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, measTypesSpec); + return oldMeasValues.toBuilder() // + .measResults(newResults) // + .build(); } private MeasTypes createMeasTypes(Collection newMeasValues, MeasTypes oldMMeasTypes) { diff --git a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java index 4806604..682f15e 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java @@ -42,12 +42,12 @@ class RegexpFilter implements Filter { @Override public FilteredData filter(DataFromTopic data) { if (regexp == null) { - return new FilteredData(data.key, data.value); + return new FilteredData(data.infoTypeId, data.key, data.value); } Matcher matcher = regexp.matcher(data.valueAsString()); boolean match = matcher.find(); if (match) { - return new FilteredData(data.key, data.value); + return new FilteredData(data.infoTypeId, data.key, data.value); } else { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 8603507..b235bdc 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -28,6 +28,7 @@ import java.lang.invoke.MethodHandles; import java.time.Duration; import lombok.Builder; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -36,6 +37,7 @@ import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.FilterFactory; +import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +51,6 @@ public class Job { @Schema(name = "job_statistics", description = "Statistics information for one job") public static class Statistics { - // @Schema(name = "jobId", description = "jobId", required = true) - // @SerializedName("jobId") @JsonProperty(value = "jobId", required = true) String jobId; @@ -115,8 +115,18 @@ public class Job { @Getter private BufferTimeout bufferTimeout; + @Builder + @EqualsAndHashCode + public static class KafkaDeliveryInfo { + @Getter + private String topic; + + @Getter + private String bootStrapServers; + } + @Getter - private String kafkaOutputTopic; + private KafkaDeliveryInfo deliveryInfo; public Filter.Type getFilterType() { if (filter == null || filterType == null) { @@ -137,9 +147,9 @@ public class Job { } public static class BufferTimeout { - public BufferTimeout(int maxSize, long maxTimeMiliseconds) { + public BufferTimeout(int maxSize, long maxTimeMilliseconds) { this.maxSize = maxSize; - this.maxTimeMiliseconds = maxTimeMiliseconds; + this.maxTimeMilliseconds = maxTimeMilliseconds; } public BufferTimeout() {} @@ -147,10 +157,10 @@ public class Job { @Getter private int maxSize; - private long maxTimeMiliseconds; + private long maxTimeMilliseconds; public Duration getMaxTime() { - return Duration.ofMillis(maxTimeMiliseconds); + return Duration.ofMillis(maxTimeMilliseconds); } } @@ -197,7 +207,7 @@ public class Job { .groupId(type.getKafkaGroupId()) // .inputTopic(type.getKafkaInputTopic()) // .jobId(id) // - .outputTopic(parameters.getKafkaOutputTopic()) // + .outputTopic(parameters.getDeliveryInfo() == null ? "" : parameters.getDeliveryInfo().topic) // .typeId(type.getId()) // .clientId(type.getKafkaClientId(appConfig)) // .build(); @@ -207,14 +217,14 @@ public class Job { public Filter.FilteredData filter(DataFromTopic data) { if (filter == null) { logger.debug("No filter used"); - return new Filter.FilteredData(data.key, data.value); + return new Filter.FilteredData(data.infoTypeId, data.key, data.value); } return filter.filter(data); } public boolean isBuffered() { return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0 - && parameters.bufferTimeout.maxTimeMiliseconds > 0; + && parameters.bufferTimeout.maxTimeMilliseconds > 0; } } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 2c6b329..9a0559c 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -29,12 +29,19 @@ import java.util.List; import java.util.Map; import java.util.Vector; +import lombok.Getter; + import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.filter.Filter; +import org.oran.dmaapadapter.filter.FilterFactory; +import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job.Parameters; +import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -44,15 +51,141 @@ import org.springframework.stereotype.Component; @Component public class Jobs { public interface Observer { - void onJobbAdded(Job job); + void onJobbGroupAdded(JobGroup jobGroup); + + void onJobGroupRemoved(JobGroup jobGroup); + } + + public interface JobGroup { + public String getId(); + + public InfoType getType(); + + public void remove(Job job); + + public boolean isEmpty(); + + public Filter.FilteredData filter(DataFromTopic data); + + public Iterable getJobs(); + + public KafkaDeliveryInfo getDeliveryInfo(); + } + + public static class JobGroupSingle implements JobGroup { + @Getter + private final Job job; + private boolean isJobRemoved = false; + + public JobGroupSingle(Job job) { + this.job = job; + } + + @Override + public Filter.FilteredData filter(DataFromTopic data) { + return job.filter(data); + } + + @Override + public void remove(Job job) { + this.isJobRemoved = true; + } + + @Override + public boolean isEmpty() { + return isJobRemoved; + } + + @Override + public String getId() { + return job.getId(); + } + + @Override + public InfoType getType() { + return job.getType(); + } + + @Override + public Iterable getJobs() { + Collection c = new ArrayList<>(); + c.add(job); + return c; + } + + @Override + public KafkaDeliveryInfo getDeliveryInfo() { + return this.job.getParameters().getDeliveryInfo(); + } + } + + public static class JobGroupPm implements JobGroup { + @Getter + private final KafkaDeliveryInfo deliveryInfo; - void onJobRemoved(Job job); + private Map jobs = new HashMap<>(); + + @Getter + private PmReportFilter filter; + + @Getter + private final InfoType type; + + public JobGroupPm(InfoType type, KafkaDeliveryInfo topic) { + this.deliveryInfo = topic; + this.type = type; + } + + public synchronized void add(Job job) { + this.jobs.put(job.getId(), job); + this.filter = createFilter(); + } + + public synchronized void remove(Job job) { + this.jobs.remove(job.getId()); + if (!this.jobs.isEmpty()) { + this.filter = createFilter(); + } + } + + public boolean isEmpty() { + return jobs.isEmpty(); + } + + @Override + public Filter.FilteredData filter(DataFromTopic data) { + return filter.filter(data); + } + + public Job getAJob() { + if (this.jobs.isEmpty()) { + return null; + } + return this.jobs.values().iterator().next(); + } + + private PmReportFilter createFilter() { + Collection filterData = new ArrayList<>(); + this.jobs.forEach((key, value) -> filterData.add((PmReportFilter) value.getFilter())); + return FilterFactory.createAggregateFilter(filterData); + } + + @Override + public String getId() { + return deliveryInfo.getTopic(); + } + + @Override + public Iterable getJobs() { + return this.jobs.values(); + } } private static final Logger logger = LoggerFactory.getLogger(Jobs.class); private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); + private Map jobGroups = new HashMap<>(); // Key is Topic or JobId private final AsyncRestClientFactory restclientFactory; private final List observers = new ArrayList<>(); private final ApplicationConfig appConfig; @@ -78,7 +211,7 @@ public class Jobs { public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) throws ServiceException { - if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) { + if ((parameters.getDeliveryInfo() != null) && !Strings.isNullOrEmpty(callbackUrl)) { throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST); } AsyncRestClient consumerRestClient = type.isUseHttpProxy() // @@ -86,9 +219,6 @@ public class Jobs { : restclientFactory.createRestClientNoHttpProxy(callbackUrl); Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig); this.put(job); - synchronized (observers) { - this.observers.forEach(obs -> obs.onJobbAdded(job)); - } } public void addObserver(Observer obs) { @@ -97,10 +227,40 @@ public class Jobs { } } + private String jobGroupId(Job job) { + if (job.getParameters().getDeliveryInfo() == null) { + return job.getId(); + } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) { + return job.getParameters().getDeliveryInfo().getTopic(); + } else { + return job.getId(); + } + } + private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); + remove(job.getId()); + allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); + + if (job.getParameters().getFilterType() == Filter.Type.PM_DATA + && job.getParameters().getDeliveryInfo() != null) { + String jobGroupId = jobGroupId(job); + if (!this.jobGroups.containsKey(jobGroupId)) { + final JobGroupPm group = new JobGroupPm(job.getType(), job.getParameters().getDeliveryInfo()); + this.jobGroups.put(jobGroupId, group); + group.add(job); + this.observers.forEach(obs -> obs.onJobbGroupAdded(group)); + } else { + JobGroupPm group = (JobGroupPm) this.jobGroups.get(jobGroupId); + group.add(job); + } + } else { + JobGroupSingle group = new JobGroupSingle(job); + this.jobGroups.put(jobGroupId(job), group); + this.observers.forEach(obs -> obs.onJobbGroupAdded(group)); + } } public synchronized Iterable getAll() { @@ -116,15 +276,20 @@ public class Jobs { } public void remove(Job job) { + String groupId = jobGroupId(job); + JobGroup group = this.jobGroups.get(groupId); synchronized (this) { this.allJobs.remove(job.getId()); jobsByType.remove(job.getType().getId(), job.getId()); + group.remove(job); + if (group.isEmpty()) { + this.jobGroups.remove(groupId); + } } - notifyJobRemoved(job); - } - private synchronized void notifyJobRemoved(Job job) { - this.observers.forEach(obs -> obs.onJobRemoved(job)); + if (group.isEmpty()) { + this.observers.forEach(obs -> obs.onJobGroupRemoved(group)); + } } public synchronized int size() { @@ -137,7 +302,7 @@ public class Jobs { public void clear() { - this.allJobs.forEach((id, job) -> notifyJobRemoved(job)); + this.jobGroups.forEach((id, group) -> this.observers.forEach(obs -> obs.onJobGroupRemoved(group))); synchronized (this) { allJobs.clear(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 719597a..a9123e6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener { .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) // .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // - .map(input -> new DataFromTopic(null, input.getBytes(), false)) + .map(input -> new DataFromTopic(this.type.getId(), null, null, input.getBytes())) .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) .publish() // .autoConnect(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index 71918db..6193af6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -23,6 +23,8 @@ package org.oran.dmaapadapter.tasks; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; +import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; @@ -36,14 +38,15 @@ import reactor.core.publisher.Mono; public class HttpJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpJobDataDistributor(Job job, ApplicationConfig config) { + public HttpJobDataDistributor(JobGroup job, ApplicationConfig config) { super(job, config); } @Override protected Mono sendToClient(Filter.FilteredData output) { - Job job = this.getJob(); - logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); + JobGroupSingle group = (JobGroupSingle) this.getJobGroup(); + Job job = group.getJob(); + logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output.getValueAString()); MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; return job.getConsumerRestClient().post("", output.getValueAString(), contentType); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index f9353f1..ef493a5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -34,6 +34,9 @@ import org.oran.dmaapadapter.datastore.DataStore; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; +import org.oran.dmaapadapter.repository.Jobs.JobGroupPm; +import org.oran.dmaapadapter.repository.Jobs.JobGroupSingle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +53,7 @@ public abstract class JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class); @Getter - private final Job job; + private final JobGroup jobGroup; private Disposable subscription; private final ErrorStats errorStats = new ErrorStats(); @@ -71,50 +74,48 @@ public abstract class JobDataDistributor { } } - protected JobDataDistributor(Job job, ApplicationConfig applConfig) { + protected JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) { this.applConfig = applConfig; - this.job = job; + this.jobGroup = jobGroup; this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); this.dataStore.create(DataStore.Bucket.LOCKS).subscribe(); } public void start(Flux input) { - logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(), - job.getParameters().getKafkaOutputTopic()); - PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; - + logger.debug("Starting distribution, to topic: {}", jobGroup.getId()); + PmReportFilter filter = getPmReportFilter(this.jobGroup); if (filter == null || filter.getFilterData().getPmRopEndTime() == null) { - this.subscription = filterAndBuffer(input, this.job) // + this.subscription = filterAndBuffer(input, this.jobGroup) // .flatMap(this::sendToClient) // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // this::handleExceptionInStream, // - () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId())); + () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId())); } if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { this.dataStore.createLock(collectHistoricalDataLockName()) // .doOnNext(isLockGranted -> { if (isLockGranted.booleanValue()) { - logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId()); + logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId()); } else { logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}", - this.job.getId()); + this.jobGroup.getId()); } }) // .filter(isLockGranted -> isLockGranted) // .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) // .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName, - this.job.getId())) // + this.jobGroup.getId())) // .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) // .filter(this::isRopFile) // .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) // .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) // .map(this::createFakeEvent) // - .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(), + .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(), dataStore), 100) - .map(job::filter) // + .map(jobGroup::filter) // .map(this::gzip) // .flatMap(this::sendToClient, 1) // .onErrorResume(this::handleCollectHistoricalDataError) // @@ -122,6 +123,17 @@ public abstract class JobDataDistributor { } } + private static PmReportFilter getPmReportFilter(JobGroup jobGroup) { + + if (jobGroup instanceof JobGroupPm) { + return ((JobGroupPm) jobGroup).getFilter(); + } else if (jobGroup instanceof JobGroupSingle) { + Filter f = ((JobGroupSingle) jobGroup).getJob().getFilter(); + return (f instanceof PmReportFilter) ? (PmReportFilter) f : null; + } + return null; + } + private Filter.FilteredData gzip(Filter.FilteredData data) { if (this.applConfig.isZipOutput()) { try { @@ -131,7 +143,7 @@ public abstract class JobDataDistributor { gzip.flush(); gzip.close(); byte[] zipped = out.toByteArray(); - return new Filter.FilteredData(data.key, zipped, true); + return new Filter.FilteredData(data.infoTypeId, data.key, zipped, true); } catch (IOException e) { logger.error("Unexpected exception when zipping: {}", e.getMessage()); return data; @@ -142,30 +154,28 @@ public abstract class JobDataDistributor { } private Mono handleCollectHistoricalDataError(Throwable t) { - logger.error("Exception: {} job: {}", t.getMessage(), job.getId()); + logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId()); return tryDeleteLockFile() // .map(bool -> "OK"); } private String collectHistoricalDataLockName() { - return "collectHistoricalDataLock" + this.job.getId(); + return "collectHistoricalDataLock" + this.jobGroup.getId(); } private TopicListener.DataFromTopic createFakeEvent(String fileName) { NewFileEvent ev = new NewFileEvent(fileName); - return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false); + return new TopicListener.DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes()); } private static String fileTimePartFromRopFileName(String fileName) { + // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json" return fileName.substring(fileName.lastIndexOf("/") + 2); } private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) { - // A20000626.2315+0200-2330+0200_HTTPS-6-73.json try { - String fileTimePart = fileTimePartFromRopFileName(fileName); - fileTimePart = fileTimePart.substring(0, 18); - OffsetDateTime fileStartTime = parseFileDate(fileTimePart); + OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName); OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime()); boolean isMatch = fileStartTime.isAfter(startTime); logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName, @@ -182,14 +192,11 @@ public abstract class JobDataDistributor { } private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) { - // A20000626.2315+0200-2330+0200_HTTPS-6-73.json if (filter.getPmRopEndTime() == null) { return true; } try { - String fileTimePart = fileTimePartFromRopFileName(fileName); - fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28); - OffsetDateTime fileEndTime = parseFileDate(fileTimePart); + OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName); OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime()); boolean isMatch = fileEndTime.isBefore(endTime); logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime, @@ -202,6 +209,20 @@ public abstract class JobDataDistributor { } } + private static OffsetDateTime getStartTimeFromFileName(String fileName) { + String fileTimePart = fileTimePartFromRopFileName(fileName); + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json + fileTimePart = fileTimePart.substring(0, 18); + return parseFileDate(fileTimePart); + } + + private static OffsetDateTime getEndTimeFromFileName(String fileName) { + String fileTimePart = fileTimePartFromRopFileName(fileName); + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json + fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28); + return parseFileDate(fileTimePart); + } + private static OffsetDateTime parseFileDate(String timeStr) { DateTimeFormatter startTimeFormatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); @@ -209,14 +230,14 @@ public abstract class JobDataDistributor { } private void handleExceptionInStream(Throwable t) { - logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId()); + logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId()); } protected abstract Mono sendToClient(Filter.FilteredData output); public synchronized void stop() { if (this.subscription != null) { - logger.debug("Stopped, job: {}", job.getId()); + logger.debug("Stopped, job: {}", jobGroup.getId()); this.subscription.dispose(); this.subscription = null; } @@ -233,23 +254,26 @@ public abstract class JobDataDistributor { return this.subscription != null; } - private Flux filterAndBuffer(Flux inputFlux, Job job) { + private Flux filterAndBuffer(Flux inputFlux, JobGroup jobGroup) { Flux filtered = // inputFlux // - .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) // - .doOnNext(data -> job.getStatistics().received(data.value)) // - .map(job::filter) // + .doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) // + .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) // + .map(jobGroup::filter) // .map(this::gzip) // .filter(f -> !f.isEmpty()) // - .doOnNext(f -> job.getStatistics().filtered(f.value)) // - .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); // + .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) // + .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) // + ; // + Job job = jobGroup.getJobs().iterator().next(); if (job.isBuffered()) { filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes())); + .map(buffered -> new Filter.FilteredData(this.getJobGroup().getType().getId(), null, + buffered.toString().getBytes())); } return filtered; } @@ -264,7 +288,7 @@ public abstract class JobDataDistributor { } private Mono handleError(Throwable t) { - logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); + logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId()); this.errorStats.handleException(t); return Mono.empty(); // Ignore } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 6b33c3b..24730a1 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -29,7 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; -import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,25 +49,25 @@ public class KafkaJobDataDistributor extends JobDataDistributor { private KafkaSender sender; - public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { - super(job, appConfig); + public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) { + super(jobGroup, appConfig); - SenderOptions senderOptions = senderOptions(appConfig); + SenderOptions senderOptions = senderOptions(appConfig, jobGroup.getDeliveryInfo()); this.sender = KafkaSender.create(senderOptions); } @Override protected Mono sendToClient(Filter.FilteredData data) { - Job job = this.getJob(); - SenderRecord senderRecord = senderRecord(data, job); + + SenderRecord senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo()); logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10), - job.getParameters().getKafkaOutputTopic()); + this.getJobGroup().getDeliveryInfo()); return this.sender.send(Mono.just(senderRecord)) // - .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) // - .doOnError( - t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) // + .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) // + .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(), + t.getMessage())) // .onErrorResume(t -> Mono.empty()) // .collectList() // .map(x -> "ok"); @@ -82,8 +83,13 @@ public class KafkaJobDataDistributor extends JobDataDistributor { } } - private static SenderOptions senderOptions(ApplicationConfig config) { - String bootstrapServers = config.getKafkaBootStrapServers(); + private static SenderOptions senderOptions(ApplicationConfig config, + KafkaDeliveryInfo deliveryInfo) { + + String bootstrapServers = deliveryInfo.getBootStrapServers(); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + bootstrapServers = config.getKafkaBootStrapServers(); + } Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -93,10 +99,11 @@ public class KafkaJobDataDistributor extends JobDataDistributor { return SenderOptions.create(props); } - private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { + private SenderRecord senderRecord(Filter.FilteredData output, + KafkaDeliveryInfo deliveryInfo) { int correlationMetadata = 2; - String topic = infoJob.getParameters().getKafkaOutputTopic(); - var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers()); + var producerRecord = + new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers()); return SenderRecord.create(producerRecord, correlationMetadata); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 1a20c68..021678d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -85,7 +85,7 @@ public class KafkaTopicListener implements TopicListener { .doFinally( sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) // .filter(t -> t.value().length > 0 || t.key().length > 0) // - .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) // + .map(input -> new DataFromTopic(this.type.getId(), input.headers(), input.key(), input.value())) // .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) // .publish() // .autoConnect(1); @@ -126,7 +126,7 @@ public class KafkaTopicListener implements TopicListener { logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic()); return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) // .map(bytes -> unzip(bytes, ev.getFilename())) // - .map(bytes -> new DataFromTopic(data.key, bytes, false)); + .map(bytes -> new DataFromTopic(data.infoTypeId, data.headers, data.key, bytes)); } catch (Exception e) { return Mono.just(data); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 8b1afc8..debc783 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -34,7 +34,10 @@ public interface TopicListener { public static class DataFromTopic { public final byte[] key; public final byte[] value; - public final boolean isZipped; + + public final String infoTypeId; + + public final Iterable
headers; private static byte[] noBytes = new byte[0]; @@ -43,29 +46,43 @@ public interface TopicListener { @ToString.Exclude private PmReport cachedPmReport; - public DataFromTopic(byte[] key, byte[] value, boolean isZipped) { + public DataFromTopic(String typeId, Iterable
headers, byte[] key, byte[] value) { this.key = key == null ? noBytes : key; this.value = value == null ? noBytes : value; - this.isZipped = isZipped; + this.infoTypeId = typeId; + this.headers = headers; } public String valueAsString() { return new String(this.value); } - public static final String ZIP_PROPERTY = "gzip"; + public static final String ZIPPED_PROPERTY = "gzip"; + public static final String TYPE_ID_PROPERTY = "type-id"; - public static boolean findZipped(Iterable
headers) { + public boolean isZipped() { if (headers == null) { return false; } for (Header h : headers) { - if (h.key().equals(ZIP_PROPERTY)) { + if (h.key().equals(ZIPPED_PROPERTY)) { return true; } } return false; } + + public String getTypeIdFromHeaders() { + if (headers == null) { + return ""; + } + for (Header h : headers) { + if (h.key().equals(TYPE_ID_PROPERTY)) { + return new String(h.value()); + } + } + return ""; + } } public Flux getFlux(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index a3e3703..38035c6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -25,13 +25,12 @@ import java.util.Map; import lombok.Getter; -import org.apache.logging.log4j.util.Strings; import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; -import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.Jobs.JobGroup; import org.oran.dmaapadapter.repository.MultiMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,52 +69,52 @@ public class TopicListeners { jobs.addObserver(new Jobs.Observer() { @Override - public void onJobbAdded(Job job) { - addJob(job); + public void onJobbGroupAdded(JobGroup jobGroup) { + addJob(jobGroup); } @Override - public void onJobRemoved(Job job) { - removeJob(job); + public void onJobGroupRemoved(JobGroup jobGroup) { + removeDistributor(jobGroup); } }); } - public synchronized void addJob(Job job) { - removeJob(job); - logger.debug("Job added {}", job.getId()); - if (job.getType().isKafkaTopicDefined()) { - addConsumer(job, dataDistributors, kafkaTopicListeners); + public synchronized void addJob(JobGroup jobGroup) { + removeDistributor(jobGroup); + logger.debug("Job added {}", jobGroup.getId()); + if (jobGroup.getType().isKafkaTopicDefined()) { + addDistributor(jobGroup, dataDistributors, kafkaTopicListeners); } - if (job.getType().isDmaapTopicDefined()) { - addConsumer(job, dataDistributors, dmaapTopicListeners); + if (jobGroup.getType().isDmaapTopicDefined()) { + addDistributor(jobGroup, dataDistributors, dmaapTopicListeners); } } - private JobDataDistributor createConsumer(Job job) { - return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) - : new HttpJobDataDistributor(job, appConfig); + private JobDataDistributor createDistributor(JobGroup jobGroup) { + return jobGroup.getDeliveryInfo() != null ? new KafkaJobDataDistributor(jobGroup, appConfig) + : new HttpJobDataDistributor(jobGroup, appConfig); } - private void addConsumer(Job job, MultiMap distributors, + private void addDistributor(JobGroup jobGroup, MultiMap distributors, Map topicListeners) { - TopicListener topicListener = topicListeners.get(job.getType().getId()); - JobDataDistributor distributor = createConsumer(job); + TopicListener topicListener = topicListeners.get(jobGroup.getType().getId()); + JobDataDistributor distributor = createDistributor(jobGroup); distributor.start(topicListener.getFlux()); - distributors.put(job.getType().getId(), job.getId(), distributor); + distributors.put(jobGroup.getType().getId(), jobGroup.getId(), distributor); } - public synchronized void removeJob(Job job) { - removeJob(job, dataDistributors); + public synchronized void removeDistributor(JobGroup jobGroup) { + removeDistributor(jobGroup, dataDistributors); } - private static void removeJob(Job job, MultiMap distributors) { - JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId()); + private static void removeDistributor(JobGroup jobGroup, MultiMap distributors) { + JobDataDistributor distributor = distributors.remove(jobGroup.getType().getId(), jobGroup.getId()); if (distributor != null) { - logger.debug("Job removed {}", job.getId()); + logger.debug("Job removed {}", jobGroup.getId()); distributor.stop(); } } diff --git a/src/main/resources/typeSchema.json b/src/main/resources/typeSchema.json index be2829f..d5b20fc 100644 --- a/src/main/resources/typeSchema.json +++ b/src/main/resources/typeSchema.json @@ -17,8 +17,19 @@ "type": "integer", "minimum": 1 }, - "kafkaOutputTopic" : { - "type": "string" + "deliveryInfo": { + "type": "object", + "properties": { + "topic": { + "type": "string" + }, + "bootStrapServers": { + "type": "string" + } + }, + "required": [ + "topic" + ] }, "bufferTimeout": { "type": "object", @@ -27,7 +38,7 @@ "type": "integer", "minimum": 1 }, - "maxTimeMiliseconds": { + "maxTimeMilliseconds": { "type": "integer", "minimum": 0, "maximum": 160000 @@ -36,9 +47,9 @@ "additionalProperties": false, "required": [ "maxSize", - "maxTimeMiliseconds" + "maxTimeMilliseconds" ] } }, "additionalProperties": false -} +} \ No newline at end of file diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index df3f723..2e14ede 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -23,19 +23,27 @@ } ] }, - "measObjClass": { + "measTypeSpecs": { "type": "array", "items": [ { - "type": "string" - } - ] - }, - "measTypes": { - "type": "array", - "items": [ - { - "type": "string" + "type": "object", + "properties": { + "measuredObjClass": { + "type": "string" + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + } + }, + "required": [ + "measuredObjClass" + ] } ] }, @@ -61,8 +69,24 @@ "pmdata" ] }, - "kafkaOutputTopic": { - "type": "string" + "deliveryInfo": { + "type": "object", + "additionalProperties": false, + "properties": { + "topic": { + "type": "string" + }, + "bootStrapServers": { + "type": "string" + } + }, + "required": [ + "topic" + ] } - } + }, + "required": [ + "filter", + "filterType" + ] } \ No newline at end of file diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 10779d0..da64e0c 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -40,7 +40,9 @@ import java.util.Map; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; @@ -75,11 +77,13 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; +import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +@TestMethodOrder(MethodOrderer.MethodName.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // @@ -115,7 +119,7 @@ class ApplicationTest { @Autowired private SecurityContext securityContext; - private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); @LocalServerPort int localServerHttpPort; @@ -383,7 +387,7 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs"); filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); @@ -559,7 +563,7 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.addMeasTypes("ManagedElement", "succImmediateAssignProcs"); Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); @@ -601,7 +605,27 @@ class ApplicationTest { String stats = restClient().get(targetUri).block(); assertThat(stats).contains(JOB_ID, "DmaapInformationType"); + } + + @Test + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. + void testZZActuator() throws Exception { + // The test must be run last, hence the "ZZ" in the name. All succeeding tests + // will fail. + AsyncRestClient client = restClient(); + client.post("/actuator/loggers/org.oran.dmaapadapter", "{\"configuredLevel\":\"trace\"}").block(); + String resp = client.get("/actuator/loggers/org.oran.dmaapadapter").block(); + assertThat(resp).contains("TRACE"); + client.post("/actuator/loggers/org.springframework.boot.actuate", "{\"configuredLevel\":\"trace\"}").block(); + // This will stop the web server and all coming tests will fail. + client.post("/actuator/shutdown", "").block(); + Thread.sleep(1000); + String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs"; + StepVerifier.create(restClient().get(url)) // Any call + .expectSubscription() // + .expectErrorMatches(t -> t instanceof WebClientRequestException) // + .verify(); } public static void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { diff --git a/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/src/test/java/org/oran/dmaapadapter/ConsumerController.java index a62ae7d..f1b0f21 100644 --- a/src/test/java/org/oran/dmaapadapter/ConsumerController.java +++ b/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -54,9 +54,9 @@ public class ConsumerController { public static class TestResults { - public List receivedBodies = Collections.synchronizedList(new ArrayList()); + public final List receivedBodies = Collections.synchronizedList(new ArrayList()); - public List> receivedHeaders = + public final List> receivedHeaders = Collections.synchronizedList(new ArrayList>()); public TestResults() {} diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 5f7a886..4146212 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -27,10 +27,6 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.oran.dmaapadapter.clients.AsyncRestClient; @@ -39,6 +35,7 @@ import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; @@ -275,38 +272,37 @@ class IntegrationWithIcs { ConsumerController.TestResults results = this.consumerController.testResults; await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2)); assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); - assertThat(results.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); deleteInformationJobInIcs(DMAAP_JOB_ID); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } + private String pmJobParameters() { + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + + filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs"); + filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); + filterData.getSourceNames().add("O-DU-1122"); + filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); + Job.Parameters param = + Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); + + return gson.toJson(param); + } + @Test void testPmFilter() throws Exception { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "PmDataOverRest"; - String jsonStr = - reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }"); + String jsonStr = pmJobParameters(); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), ""); createInformationJobInIcs(DMAAP_JOB_ID, jobInfo); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - String path = "./src/test/resources/pm_report.json"; - String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - DmaapSimulatorController.addPmResponse(pmReportJson); - - ConsumerController.TestResults results = this.consumerController.testResults; - await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(1)); - - String filtered = results.receivedBodies.get(0); - assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("attTCHSeizures"); - - logger.info(filtered); - deleteInformationJobInIcs(DMAAP_JOB_ID); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 7e806e2..12bec80 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -203,16 +203,17 @@ class IntegrationWithKafka { } private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) { - if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) { - logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped, + if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) { + logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(), this.applicationConfig.isZipOutput()); } - if (!receivedKafkaOutput.isZipped) { + + if (!receivedKafkaOutput.isZipped()) { return receivedKafkaOutput; } try { byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value); - return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false); + return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key); } catch (IOException e) { logger.error("********* ERROR ", e.getMessage()); return null; @@ -222,7 +223,10 @@ class IntegrationWithKafka { private void set(TopicListener.DataFromTopic receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; this.count++; - logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); + if (logger.isDebugEnabled()) { + logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); + logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders()); + } } synchronized String lastKey() { @@ -234,7 +238,7 @@ class IntegrationWithKafka { } void reset() { - this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false); + this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null); this.count = 0; } } @@ -303,8 +307,8 @@ class IntegrationWithKafka { return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); } - private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize) { - Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null; + private static Object jobParametersAsJsonObject(String filter, long maxTimeMilliseconds, int maxSize) { + Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMilliseconds) : null; Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE) .bufferTimeout(buffer).build(); @@ -332,8 +336,14 @@ class IntegrationWithKafka { ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { try { - Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) - .kafkaOutputTopic(topic).build(); + Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() // + .topic(topic) // + .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) // + .build(); + Job.Parameters param = Job.Parameters.builder() // + .filter(filterData) // + .filterType(Job.Parameters.PM_FILTER_TYPE).deliveryInfo(deliveryInfo) // + .build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -344,9 +354,13 @@ class IntegrationWithKafka { } } - ConsumerJobInfo consumerJobInfoKafka(String topic) { + private ConsumerJobInfo consumerJobInfoKafka(String topic) { try { - Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build(); + Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() // + .topic(topic) // + .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) // + .build(); + Job.Parameters param = Job.Parameters.builder().deliveryInfo(deliveryInfo).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -494,9 +508,11 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient()); this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + waitForKafkaListener(); var dataToSend = Flux.range(1, 3).map(i -> kafkaSenderRecord("Message_" + i, "", KAFKA_TYPE_ID)); // Message_1, @@ -573,8 +589,8 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("pmCounterNumber0"); - filterData.getMeasObjClass().add("NRCellCU"); + + filterData.addMeasTypes("NRCellCU", "pmCounterNumber0"); this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient()); @@ -622,11 +638,10 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - final int NO_OF_COUNTERS = 5; + final int NO_OF_COUNTERS = 2; for (int i = 0; i < NO_OF_COUNTERS; ++i) { - filterData.getMeasTypes().add("pmCounterNumber" + i); + filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); } - filterData.getMeasObjClass().add("NRCellCU"); final int NO_OF_JOBS = 150; @@ -678,34 +693,31 @@ class IntegrationWithKafka { @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test - void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception { + void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception { // Filter PM reports and sent to many jobs over Kafka - this.applicationConfig.setZipOutput(true); + this.applicationConfig.setZipOutput(false); // Register producer, Register types await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - final int NO_OF_COUNTERS = 0; - for (int i = 0; i < NO_OF_COUNTERS; ++i) { - filterData.getMeasTypes().add("pmCounterNumber" + i); - } - filterData.getMeasObjClass().add("NRCellCU"); + final int NO_OF_JOBS = 150; + ArrayList receivers = new ArrayList<>(); + for (int i = 0; i < NO_OF_JOBS; ++i) { + final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers"; + final String jobId = "manyJobs_" + i; + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added - final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers"; - this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient()); - final int NO_OF_RECEIVERS = 150; - ArrayList receivers = new ArrayList<>(); - for (int i = 0; i < NO_OF_RECEIVERS; ++i) { KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i); receivers.add(receiver); } - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); final int NO_OF_OBJECTS = 1000; diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java index 6fa7ce8..7ad5a6c 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java @@ -33,7 +33,7 @@ import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsltFilterTest { private String filterReport(JsltFilter filter) throws Exception { - DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false); + DataFromTopic data = new DataFromTopic("type", null, null, loadReport().getBytes()); FilteredData filtered = filter.filter(data); return filtered.getValueAString(); } diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java index 6e75757..2f5b4b1 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java @@ -36,7 +36,7 @@ class JsonPathFilterTest { void testJsonPath() throws Exception { String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); JsonPathFilter filter = new JsonPathFilter(exp); - DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false); + DataFromTopic data = new DataFromTopic("typeId", null, null, loadReport().getBytes()); FilteredData filtered = filter.filter(data); String res = filtered.getValueAString(); assertThat(res).isEqualTo("\"attTCHSeizures\""); diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index 2f1f3a6..5f3f69c 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -99,7 +99,8 @@ class PmReportFilterTest { private String filterReport(PmReportFilter filter) throws Exception { - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); + TopicListener.DataFromTopic data = + new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes()); FilteredData filtered = filter.filter(data); String reportAfterFilter = gson.toJson(data.getCachedPmReport()); @@ -114,7 +115,7 @@ class PmReportFilterTest { void testPmFilterMeasTypes() throws Exception { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.measTypes.add("succImmediateAssignProcs"); + filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs"); PmReportFilter filter = new PmReportFilter(filterData); String filtered = filterReport(filter); @@ -124,7 +125,8 @@ class PmReportFilterTest { // Test that no report is returned if not meas types were found filterData = new PmReportFilter.FilterData(); - filterData.measTypes.add("junk"); + filterData.addMeasTypes("junk", "succImmediateAssignProcs"); + filter = new PmReportFilter(filterData); filtered = filterReport(filter); assertThat(filtered).isEmpty(); @@ -149,22 +151,23 @@ class PmReportFilterTest { void testMeasObjClass() throws Exception { { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.measObjClass.add("junk"); + filterData.addMeasTypes("junk"); PmReportFilter filter = new PmReportFilter(filterData); String filtered = filterReport(filter); assertThat(filtered).isEmpty(); } { - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); + TopicListener.DataFromTopic data = + new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes()); PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData(); - utranCellFilter.measObjClass.add("UtranCell"); + utranCellFilter.addMeasTypes("UtranCell"); FilteredData filtered = new PmReportFilter(utranCellFilter).filter(data); assertThat(filtered.getValueAString()).contains("UtranCell").doesNotContain("ENodeBFunction"); PmReportFilter.FilterData eNodeBFilter = new PmReportFilter.FilterData(); - eNodeBFilter.measObjClass.add("ENodeBFunction"); + eNodeBFilter.addMeasTypes("ENodeBFunction"); filtered = new PmReportFilter(eNodeBFilter).filter(data); assertThat(filtered.getValueAString()).contains("ENodeBFunction").doesNotContain("UtranCell"); } @@ -210,10 +213,9 @@ class PmReportFilterTest { { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("pmCounterNumber0"); - filterData.getMeasObjClass().add("NRCellCU"); + filterData.addMeasTypes("NRCellCU", "pmCounterNumber0"); PmReportFilter filter = new PmReportFilter(filterData); - DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false); + DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes()); Instant startTime = Instant.now(); for (int i = 0; i < TIMES; ++i) { @@ -266,10 +268,11 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); PmReportFilter filter = new PmReportFilter(filterData); - FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false)); + FilteredData filtered = filter.filter(new TopicListener.DataFromTopic("typeId", null, null, "junk".getBytes())); assertThat(filtered.isEmpty()).isTrue(); - filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false)); + filtered = filter + .filter(new TopicListener.DataFromTopic("typeId", null, null, reQuote("{'msg': 'test'}").getBytes())); assertThat(filtered.isEmpty()).isTrue(); } diff --git a/src/test/resources/pm_report.json b/src/test/resources/pm_report.json index 1aa97c1..402ffea 100644 --- a/src/test/resources/pm_report.json +++ b/src/test/resources/pm_report.json @@ -2,16 +2,11 @@ "event": { "commonEventHeader": { "domain": "perf3gpp", - "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882", - "sequence": 0, "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult", "sourceName": "O-DU-1122", "reportingEntityName": "", - "priority": "Normal", "startEpochMicrosec": 951912000000, "lastEpochMicrosec": 951912900000, - "version": "4.0", - "vesEventListenerVersion": "7.1", "timeZoneOffset": "+00:00" }, "perf3gppFields": {