From: PatrikBuhr Date: Thu, 5 Jan 2023 13:06:38 +0000 (+0100) Subject: DMaaP adapter, updated job definition syntax X-Git-Tag: 1.3.0~7 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c32ab314a9a9b46df45e2c3d48b59b0b28a807a3;p=nonrtric%2Fplt%2Fdmaapadapter.git DMaaP adapter, updated job definition syntax The filter syntax is updated so that measTypes are structured below measuredObjClass The job definition is updated so that the output contains both topic and the kafka boostrap servers. Graceful shutdown of Dmaap Adapter Example of PM definition after the change: { "filter":{ "sourceNames":[ "NodeA", "NodeB" ], "measObjInstIds":[ ], "measTypeSpecs":[ { "measuredObjClass":"NRCellCU", "measTypes":[ "pmCounterNumber0" ] } ], "measuredEntityDns":[ ] }, "deliveryInfo":{ "topic":"ouputTopic", "bootStrapServers":"localhost:9092" } } Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-831 Change-Id: Ifbdeb676fce22eaff3c6e35eb50c76a661ea27fa --- diff --git a/api/api.json b/api/api.json index 3ec5a4d..6ba7582 100644 --- a/api/api.json +++ b/api/api.json @@ -296,6 +296,15 @@ }}, "tags": ["Actuator"] }}, + "/actuator/shutdown": {"post": { + "summary": "Actuator web endpoint 'shutdown'", + "operationId": "shutdown_2", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, "/data-producer/v1/info-producers/{infoProducerId}": { "get": { "operationId": "getInfoProducer", diff --git a/api/api.yaml b/api/api.yaml index b2eaf15..024b43f 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -174,6 +174,19 @@ paths: '*/*': schema: type: object + /actuator/shutdown: + post: + tags: + - Actuator + summary: Actuator web endpoint 'shutdown' + operationId: shutdown_2 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object /data-producer/v1/info-producers/{infoProducerId}: get: tags: diff --git a/config/application.yaml b/config/application.yaml index 8975849..57c6e31 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -27,7 +27,12 @@ management: web: exposure: # Enabling of springboot actuator features. See springboot documentation. - include: "loggers,logfile,health,info,metrics,threaddump,heapdump" + include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown" + endpoint: + shutdown: + enabled: true +lifecycle: + timeout-per-shutdown-phase: "20s" springdoc: show-actuator: true logging: @@ -48,14 +53,15 @@ logging: server: # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework. # See springboot documentation. - port : 8435 - http-port: 8084 - ssl: - key-store-type: JKS - key-store-password: policy_agent - key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks - key-password: policy_agent - key-alias: policy_agent + port : 8435 + http-port: 8084 + ssl: + key-store-type: JKS + key-store-password: policy_agent + key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks + key-password: policy_agent + key-alias: policy_agent + shutdown: "graceful" app: webclient: # Configuration of the trust store used for the HTTP client (outgoing requests) diff --git a/docs/overview.rst b/docs/overview.rst index cd80ce0..aebb5cb 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -116,7 +116,10 @@ typeSchema.json =============== This schema will by default be registerred for the type. The following properties are defined: -* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. +* outputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. This consists of the following properties: + + * topic, the name of the kafka topic + * bootStrapServers, reference to the kafka bus to used. This is optional, if this is omitted the default configured kafka bus is used (which is configured in the application.yaml file). * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt". @@ -186,9 +189,11 @@ The filterType parameter is extended to allow value "pmdata" which can be used f * sourceNames an array of source names for wanted PM reports. * measObjInstIds an array of meas object instances for wanted PM reports. If a given filter value is contained in the filter definition, it will match (partial matching). For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32". -* measTypes selects the meas types to get +* measTypeSpecs selects the meas types to get. This consist of: + + * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8). + * measTypes the name of the measurement types (connected to the measObjClass). * measuredEntityDns partial match of meas entity DNs. -* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8). Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction". * pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned. In this case, the query is only done for files from the given "sourceNames". @@ -209,15 +214,17 @@ Below follows an example of a PM filter. "measObjInstIds":[ "UtranCell=dGbg-997" ], - "measTypes":[ - "succImmediateAssignProcs" - ], + "measTypeSpecs":[ + { + "measuredObjClass":"UtranCell", + "measTypes":[ + "succImmediateAssignProcs" + ] + } + ], "measuredEntityDns":[ "ManagedElement=RNC-Gbg-1" ], - "measObjClass":[ - "UtranCell" - ] "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00" } } @@ -234,12 +241,15 @@ match two counters from all cells in two traffical nodes. "sourceNames":[ "O-DU-1122", "O-DU-1123" ], - "measTypes":[ - "succImmediateAssignProcs", "attTCHSeizures" - ], - "measObjClass":[ - "UtranCell" - ] + "measTypeSpecs":[ + { + "measuredObjClass":"NRCellCU", + "measTypes":[ + "pmCounterNumber0", "pmCounterNumber1" + ] + } + ], + } } diff --git a/src/main/java/org/oran/dmaapadapter/Application.java b/src/main/java/org/oran/dmaapadapter/Application.java index 2058202..7201d4f 100644 --- a/src/main/java/org/oran/dmaapadapter/Application.java +++ b/src/main/java/org/oran/dmaapadapter/Application.java @@ -48,6 +48,14 @@ public class Application { public static void main(String[] args) { applicationContext = SpringApplication.run(Application.class); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + logger.warn("Shutting down, received signal SIGTERM"); + SpringApplication.exit(applicationContext); + applicationContext = null; + } + }); } @Scheduled(fixedRate = 10 * 1000) diff --git a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index a6e2444..3ba96ab 100644 --- a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -118,6 +118,22 @@ public class AsyncRestClient { .map(this::toBody); } + public Mono> postForEntity(String uri, @Nullable String body) { + Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(request); + } + + public Mono post(String uri, @Nullable String body) { + return postForEntity(uri, body) // + .map(this::toBody); + } + private Mono> retrieve(RequestHeadersSpec request) { if (securityContext.isConfigured()) { request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken())); diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index b53383a..d7561da 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -162,7 +162,7 @@ public class ApplicationConfig { } public Collection getTypes() { - com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); try { String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset()); ConfigFile configData = gson.fromJson(configJson, ConfigFile.class); diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index 04834c4..db1a018 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -62,11 +62,53 @@ public class PmReportFilter implements Filter { @Getter public static class FilterData { + + public static class MeasTypeSpec { + static MeasTypeSpec empty = new MeasTypeSpec(); + + static MeasTypeSpec empty() { + return empty; + } + + @Getter + String measuredObjClass; + + @Getter + final Set measTypes = new HashSet<>(); + + @Override + public boolean equals(Object obj) { + return measuredObjClass.equals(obj); + } + + @Override + public int hashCode() { + return measuredObjClass.hashCode(); + } + } + final Set sourceNames = new HashSet<>(); final Set measObjInstIds = new HashSet<>(); - final Set measTypes = new HashSet<>(); + final Collection measTypeSpecs = new ArrayList<>(); final Set measuredEntityDns = new HashSet<>(); - final Set measObjClass = new HashSet<>(); + + public void addMeasTypes(String measObjClass, String... measTypes) { + MeasTypeSpec spec = this.findMeasTypeSpec(measObjClass); + if (spec == null) { + spec = new MeasTypeSpec(); + spec.measuredObjClass = measObjClass; + this.measTypeSpecs.add(spec); + } + for (String measType : measTypes) { + spec.measTypes.add(measType); + } + } + + public void addMeasTypes(String measObjClass, Collection measTypes) { + for (String measType : measTypes) { + addMeasTypes(measObjClass, measType); + } + } @Setter String pmRopStartTime; @@ -77,9 +119,30 @@ public class PmReportFilter implements Filter { public void addAll(FilterData other) { addAll(other.sourceNames, sourceNames); addAll(other.measObjInstIds, measObjInstIds); - addAll(other.measTypes, measTypes); + addAll(other.measTypeSpecs); addAll(other.measuredEntityDns, measuredEntityDns); - addAll(other.measObjClass, measObjClass); + } + + public MeasTypeSpec getMeasTypeSpec(String measuredObjClass) { + if (measTypeSpecs.isEmpty()) { + return MeasTypeSpec.empty(); + } + return findMeasTypeSpec(measuredObjClass); + } + + private MeasTypeSpec findMeasTypeSpec(String measuredObjClass) { + for (MeasTypeSpec t : this.measTypeSpecs) { + if (t.measuredObjClass.equals(measuredObjClass)) { + return t; + } + } + return null; + } + + private void addAll(Collection measTypes) { + for (MeasTypeSpec s : measTypes) { + addMeasTypes(s.getMeasuredObjClass(), s.getMeasTypes()); + } } private void addAll(Set source, Set dst) { @@ -185,17 +248,18 @@ public class PmReportFilter implements Filter { return false; } - private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, FilterData filter) { + private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, + FilterData.MeasTypeSpec measTypesSpec) { String measType = measTypes.getMeasType(measResult.getP()); - return filter.measTypes.isEmpty() || filter.measTypes.contains(measType); + return measTypesSpec.measTypes.isEmpty() || measTypesSpec.measTypes.contains(measType); } private Collection createMeasResults(Collection oldMeasResults, MeasTypes measTypes, - FilterData filter) { + FilterData.MeasTypeSpec measTypesSpec) { Collection newMeasResults = new ArrayList<>(); for (MeasResult measResult : oldMeasResults) { - if (isMeasResultMatch(measResult, measTypes, filter)) { + if (isMeasResultMatch(measResult, measTypes, measTypesSpec)) { newMeasResults.add(measResult.toBuilder().build()); } } @@ -218,27 +282,25 @@ public class PmReportFilter implements Filter { return distinguishedName.substring(lastRdn + 1, lastEqualChar); } - private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) { - if (filter.measObjClass.isEmpty()) { - return true; - } - + private FilterData.MeasTypeSpec getMeasTypeSpec(String measObjInstId, FilterData filter) { String measObjClass = managedObjectClass(measObjInstId); - return filter.measObjClass.contains(measObjClass); + return filter.getMeasTypeSpec(measObjClass); } private MeasValuesList createMeasValuesList(MeasValuesList oldMeasValues, MeasTypes measTypes, FilterData filter) { + FilterData.MeasTypeSpec measTypesSpec = getMeasTypeSpec(oldMeasValues.getMeasObjInstId(), filter); + if (measTypesSpec == null) { + return MeasValuesList.empty(); + } - if (isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter) - && isMeasInstClassMatch(oldMeasValues.getMeasObjInstId(), filter)) { - - Collection newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, filter); - return oldMeasValues.toBuilder() // - .measResults(newResults) // - .build(); - } else { + if (!isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)) { return MeasValuesList.empty(); } + + Collection newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, measTypesSpec); + return oldMeasValues.toBuilder() // + .measResults(newResults) // + .build(); } private MeasTypes createMeasTypes(Collection newMeasValues, MeasTypes oldMMeasTypes) { diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 8695524..cda194e 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -28,6 +28,7 @@ import java.lang.invoke.MethodHandles; import java.time.Duration; import lombok.Builder; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -36,6 +37,7 @@ import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.FilterFactory; +import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +51,6 @@ public class Job { @Schema(name = "job_statistics", description = "Statistics information for one job") public static class Statistics { - // @Schema(name = "jobId", description = "jobId", required = true) - // @SerializedName("jobId") @JsonProperty(value = "jobId", required = true) String jobId; @@ -115,8 +115,18 @@ public class Job { @Getter private BufferTimeout bufferTimeout; + @Builder + @EqualsAndHashCode + public static class KafkaDeliveryInfo { + @Getter + private String topic; + + @Getter + private String bootStrapServers; + } + @Getter - private String kafkaOutputTopic; + private KafkaDeliveryInfo deliveryInfo; public Filter.Type getFilterType() { if (filter == null || filterType == null) { @@ -197,7 +207,7 @@ public class Job { .groupId(type.getKafkaGroupId()) // .inputTopic(type.getKafkaInputTopic()) // .jobId(id) // - .outputTopic(parameters.getKafkaOutputTopic()) // + .outputTopic(parameters.getDeliveryInfo() == null ? "" : parameters.getDeliveryInfo().topic) // .typeId(type.getId()) // .clientId(type.getKafkaClientId(appConfig)) // .build(); diff --git a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index f236bbe..9a0559c 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -40,6 +40,7 @@ import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.FilterFactory; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job.Parameters; +import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public class Jobs { public Iterable getJobs(); - public String getTopic(); + public KafkaDeliveryInfo getDeliveryInfo(); } public static class JobGroupSingle implements JobGroup { @@ -113,14 +114,14 @@ public class Jobs { } @Override - public String getTopic() { - return this.job.getParameters().getKafkaOutputTopic(); + public KafkaDeliveryInfo getDeliveryInfo() { + return this.job.getParameters().getDeliveryInfo(); } } public static class JobGroupPm implements JobGroup { @Getter - private final String topic; + private final KafkaDeliveryInfo deliveryInfo; private Map jobs = new HashMap<>(); @@ -130,8 +131,8 @@ public class Jobs { @Getter private final InfoType type; - public JobGroupPm(InfoType type, String topic) { - this.topic = topic; + public JobGroupPm(InfoType type, KafkaDeliveryInfo topic) { + this.deliveryInfo = topic; this.type = type; } @@ -171,7 +172,7 @@ public class Jobs { @Override public String getId() { - return topic; + return deliveryInfo.getTopic(); } @Override @@ -184,7 +185,7 @@ public class Jobs { private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); - private Map jobGroups = new HashMap<>(); + private Map jobGroups = new HashMap<>(); // Key is Topic or JobId private final AsyncRestClientFactory restclientFactory; private final List observers = new ArrayList<>(); private final ApplicationConfig appConfig; @@ -210,7 +211,7 @@ public class Jobs { public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) throws ServiceException { - if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) { + if ((parameters.getDeliveryInfo() != null) && !Strings.isNullOrEmpty(callbackUrl)) { throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST); } AsyncRestClient consumerRestClient = type.isUseHttpProxy() // @@ -227,10 +228,10 @@ public class Jobs { } private String jobGroupId(Job job) { - if (Strings.isNullOrEmpty(job.getParameters().getKafkaOutputTopic())) { + if (job.getParameters().getDeliveryInfo() == null) { return job.getId(); } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) { - return job.getParameters().getKafkaOutputTopic(); + return job.getParameters().getDeliveryInfo().getTopic(); } else { return job.getId(); } @@ -244,20 +245,20 @@ public class Jobs { jobsByType.put(job.getType().getId(), job.getId(), job); if (job.getParameters().getFilterType() == Filter.Type.PM_DATA - && job.getParameters().getKafkaOutputTopic() != null) { - String topic = job.getParameters().getKafkaOutputTopic(); - if (!this.jobGroups.containsKey(topic)) { - final JobGroupPm group = new JobGroupPm(job.getType(), topic); - this.jobGroups.put(topic, group); + && job.getParameters().getDeliveryInfo() != null) { + String jobGroupId = jobGroupId(job); + if (!this.jobGroups.containsKey(jobGroupId)) { + final JobGroupPm group = new JobGroupPm(job.getType(), job.getParameters().getDeliveryInfo()); + this.jobGroups.put(jobGroupId, group); group.add(job); this.observers.forEach(obs -> obs.onJobbGroupAdded(group)); } else { - JobGroupPm group = (JobGroupPm) this.jobGroups.get(topic); + JobGroupPm group = (JobGroupPm) this.jobGroups.get(jobGroupId); group.add(job); } } else { JobGroupSingle group = new JobGroupSingle(job); - this.jobGroups.put(job.getId(), group); + this.jobGroups.put(jobGroupId(job), group); this.observers.forEach(obs -> obs.onJobbGroupAdded(group)); } } @@ -275,7 +276,7 @@ public class Jobs { } public void remove(Job job) { - String groupId = this.jobGroupId(job); + String groupId = jobGroupId(job); JobGroup group = this.jobGroups.get(groupId); synchronized (this) { this.allJobs.remove(job.getId()); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index c0b1d16..24730a1 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; +import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo; import org.oran.dmaapadapter.repository.Jobs.JobGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,20 +52,20 @@ public class KafkaJobDataDistributor extends JobDataDistributor { public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) { super(jobGroup, appConfig); - SenderOptions senderOptions = senderOptions(appConfig); + SenderOptions senderOptions = senderOptions(appConfig, jobGroup.getDeliveryInfo()); this.sender = KafkaSender.create(senderOptions); } @Override protected Mono sendToClient(Filter.FilteredData data) { - SenderRecord senderRecord = senderRecord(data, this.getJobGroup().getTopic()); + SenderRecord senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo()); logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10), - this.getJobGroup().getTopic()); + this.getJobGroup().getDeliveryInfo()); return this.sender.send(Mono.just(senderRecord)) // - .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getTopic())) // + .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) // .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(), t.getMessage())) // .onErrorResume(t -> Mono.empty()) // @@ -82,8 +83,13 @@ public class KafkaJobDataDistributor extends JobDataDistributor { } } - private static SenderOptions senderOptions(ApplicationConfig config) { - String bootstrapServers = config.getKafkaBootStrapServers(); + private static SenderOptions senderOptions(ApplicationConfig config, + KafkaDeliveryInfo deliveryInfo) { + + String bootstrapServers = deliveryInfo.getBootStrapServers(); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + bootstrapServers = config.getKafkaBootStrapServers(); + } Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -93,9 +99,11 @@ public class KafkaJobDataDistributor extends JobDataDistributor { return SenderOptions.create(props); } - private SenderRecord senderRecord(Filter.FilteredData output, String topic) { + private SenderRecord senderRecord(Filter.FilteredData output, + KafkaDeliveryInfo deliveryInfo) { int correlationMetadata = 2; - var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers()); + var producerRecord = + new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers()); return SenderRecord.create(producerRecord, correlationMetadata); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 56180d7..38035c6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -25,7 +25,6 @@ import java.util.Map; import lombok.Getter; -import org.apache.logging.log4j.util.Strings; import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; @@ -94,7 +93,7 @@ public class TopicListeners { } private JobDataDistributor createDistributor(JobGroup jobGroup) { - return !Strings.isEmpty(jobGroup.getTopic()) ? new KafkaJobDataDistributor(jobGroup, appConfig) + return jobGroup.getDeliveryInfo() != null ? new KafkaJobDataDistributor(jobGroup, appConfig) : new HttpJobDataDistributor(jobGroup, appConfig); } diff --git a/src/main/resources/typeSchema.json b/src/main/resources/typeSchema.json index be2829f..c57d066 100644 --- a/src/main/resources/typeSchema.json +++ b/src/main/resources/typeSchema.json @@ -17,8 +17,19 @@ "type": "integer", "minimum": 1 }, - "kafkaOutputTopic" : { - "type": "string" + "deliveryInfo": { + "type": "object", + "properties": { + "topic": { + "type": "string" + }, + "bootStrapServers": { + "type": "string" + } + }, + "required": [ + "topic" + ] }, "bufferTimeout": { "type": "object", @@ -41,4 +52,4 @@ } }, "additionalProperties": false -} +} \ No newline at end of file diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index df3f723..2e14ede 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -23,19 +23,27 @@ } ] }, - "measObjClass": { + "measTypeSpecs": { "type": "array", "items": [ { - "type": "string" - } - ] - }, - "measTypes": { - "type": "array", - "items": [ - { - "type": "string" + "type": "object", + "properties": { + "measuredObjClass": { + "type": "string" + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + } + }, + "required": [ + "measuredObjClass" + ] } ] }, @@ -61,8 +69,24 @@ "pmdata" ] }, - "kafkaOutputTopic": { - "type": "string" + "deliveryInfo": { + "type": "object", + "additionalProperties": false, + "properties": { + "topic": { + "type": "string" + }, + "bootStrapServers": { + "type": "string" + } + }, + "required": [ + "topic" + ] } - } + }, + "required": [ + "filter", + "filterType" + ] } \ No newline at end of file diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 10779d0..da64e0c 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -40,7 +40,9 @@ import java.util.Map; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.clients.SecurityContext; @@ -75,11 +77,13 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; +import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +@TestMethodOrder(MethodOrderer.MethodName.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // @@ -115,7 +119,7 @@ class ApplicationTest { @Autowired private SecurityContext securityContext; - private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); @LocalServerPort int localServerHttpPort; @@ -383,7 +387,7 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs"); filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); @@ -559,7 +563,7 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.addMeasTypes("ManagedElement", "succImmediateAssignProcs"); Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); @@ -601,7 +605,27 @@ class ApplicationTest { String stats = restClient().get(targetUri).block(); assertThat(stats).contains(JOB_ID, "DmaapInformationType"); + } + + @Test + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. + void testZZActuator() throws Exception { + // The test must be run last, hence the "ZZ" in the name. All succeeding tests + // will fail. + AsyncRestClient client = restClient(); + client.post("/actuator/loggers/org.oran.dmaapadapter", "{\"configuredLevel\":\"trace\"}").block(); + String resp = client.get("/actuator/loggers/org.oran.dmaapadapter").block(); + assertThat(resp).contains("TRACE"); + client.post("/actuator/loggers/org.springframework.boot.actuate", "{\"configuredLevel\":\"trace\"}").block(); + // This will stop the web server and all coming tests will fail. + client.post("/actuator/shutdown", "").block(); + Thread.sleep(1000); + String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs"; + StepVerifier.create(restClient().get(url)) // Any call + .expectSubscription() // + .expectErrorMatches(t -> t instanceof WebClientRequestException) // + .verify(); } public static void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 7317919..01e8be6 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -336,8 +336,14 @@ class IntegrationWithKafka { ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { try { - Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) - .kafkaOutputTopic(topic).build(); + Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() // + .topic(topic) // + .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) // + .build(); + Job.Parameters param = Job.Parameters.builder() // + .filter(filterData) // + .filterType(Job.Parameters.PM_FILTER_TYPE).deliveryInfo(deliveryInfo) // + .build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -348,9 +354,13 @@ class IntegrationWithKafka { } } - ConsumerJobInfo consumerJobInfoKafka(String topic) { + private ConsumerJobInfo consumerJobInfoKafka(String topic) { try { - Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build(); + Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() // + .topic(topic) // + .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) // + .build(); + Job.Parameters param = Job.Parameters.builder().deliveryInfo(deliveryInfo).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -579,8 +589,8 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("pmCounterNumber0"); - filterData.getMeasObjClass().add("NRCellCU"); + + filterData.addMeasTypes("NRCellCU", "pmCounterNumber0"); this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient()); @@ -630,9 +640,8 @@ class IntegrationWithKafka { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); final int NO_OF_COUNTERS = 2; for (int i = 0; i < NO_OF_COUNTERS; ++i) { - filterData.getMeasTypes().add("pmCounterNumber" + i); + filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); } - filterData.getMeasObjClass().add("NRCellCU"); final int NO_OF_JOBS = 150; @@ -699,8 +708,7 @@ class IntegrationWithKafka { final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers"; final String jobId = "manyJobs_" + i; PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("pmCounterNumber" + i); // all counters will be added - filterData.getMeasObjClass().add("NRCellCU"); + filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient()); diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index bb6f145..5f3f69c 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -115,7 +115,7 @@ class PmReportFilterTest { void testPmFilterMeasTypes() throws Exception { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.measTypes.add("succImmediateAssignProcs"); + filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs"); PmReportFilter filter = new PmReportFilter(filterData); String filtered = filterReport(filter); @@ -125,7 +125,8 @@ class PmReportFilterTest { // Test that no report is returned if not meas types were found filterData = new PmReportFilter.FilterData(); - filterData.measTypes.add("junk"); + filterData.addMeasTypes("junk", "succImmediateAssignProcs"); + filter = new PmReportFilter(filterData); filtered = filterReport(filter); assertThat(filtered).isEmpty(); @@ -150,7 +151,7 @@ class PmReportFilterTest { void testMeasObjClass() throws Exception { { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.measObjClass.add("junk"); + filterData.addMeasTypes("junk"); PmReportFilter filter = new PmReportFilter(filterData); String filtered = filterReport(filter); assertThat(filtered).isEmpty(); @@ -161,12 +162,12 @@ class PmReportFilterTest { new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes()); PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData(); - utranCellFilter.measObjClass.add("UtranCell"); + utranCellFilter.addMeasTypes("UtranCell"); FilteredData filtered = new PmReportFilter(utranCellFilter).filter(data); assertThat(filtered.getValueAString()).contains("UtranCell").doesNotContain("ENodeBFunction"); PmReportFilter.FilterData eNodeBFilter = new PmReportFilter.FilterData(); - eNodeBFilter.measObjClass.add("ENodeBFunction"); + eNodeBFilter.addMeasTypes("ENodeBFunction"); filtered = new PmReportFilter(eNodeBFilter).filter(data); assertThat(filtered.getValueAString()).contains("ENodeBFunction").doesNotContain("UtranCell"); } @@ -212,8 +213,7 @@ class PmReportFilterTest { { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("pmCounterNumber0"); - filterData.getMeasObjClass().add("NRCellCU"); + filterData.addMeasTypes("NRCellCU", "pmCounterNumber0"); PmReportFilter filter = new PmReportFilter(filterData); DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes()); diff --git a/src/test/resources/pm_report.json b/src/test/resources/pm_report.json index 1aa97c1..402ffea 100644 --- a/src/test/resources/pm_report.json +++ b/src/test/resources/pm_report.json @@ -2,16 +2,11 @@ "event": { "commonEventHeader": { "domain": "perf3gpp", - "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882", - "sequence": 0, "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult", "sourceName": "O-DU-1122", "reportingEntityName": "", - "priority": "Normal", "startEpochMicrosec": 951912000000, "lastEpochMicrosec": 951912900000, - "version": "4.0", - "vesEventListenerVersion": "7.1", "timeZoneOffset": "+00:00" }, "perf3gppFields": {