The filter syntax is updated so that measTypes are structured below measuredObjClass
The job definition is updated so that the output contains both topic and the kafka boostrap servers.
Graceful shutdown of Dmaap Adapter
Example of PM definition after the change:
{
"filter":{
"sourceNames":[
"NodeA", "NodeB"
],
"measObjInstIds":[
],
"measTypeSpecs":[
{
"measuredObjClass":"NRCellCU",
"measTypes":[
"pmCounterNumber0"
]
}
],
"measuredEntityDns":[
]
},
"deliveryInfo":{
"topic":"ouputTopic",
"bootStrapServers":"localhost:9092"
}
}
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-831
Change-Id: Ifbdeb676fce22eaff3c6e35eb50c76a661ea27fa
}},
"tags": ["Actuator"]
}},
+ "/actuator/shutdown": {"post": {
+ "summary": "Actuator web endpoint 'shutdown'",
+ "operationId": "shutdown_2",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
"/data-producer/v1/info-producers/{infoProducerId}": {
"get": {
"operationId": "getInfoProducer",
'*/*':
schema:
type: object
+ /actuator/shutdown:
+ post:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'shutdown'
+ operationId: shutdown_2
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
/data-producer/v1/info-producers/{infoProducerId}:
get:
tags:
web:
exposure:
# Enabling of springboot actuator features. See springboot documentation.
- include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
+ include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown"
+ endpoint:
+ shutdown:
+ enabled: true
+lifecycle:
+ timeout-per-shutdown-phase: "20s"
springdoc:
show-actuator: true
logging:
server:
# Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
# See springboot documentation.
- port : 8435
- http-port: 8084
- ssl:
- key-store-type: JKS
- key-store-password: policy_agent
- key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
- key-password: policy_agent
- key-alias: policy_agent
+ port : 8435
+ http-port: 8084
+ ssl:
+ key-store-type: JKS
+ key-store-password: policy_agent
+ key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
+ key-password: policy_agent
+ key-alias: policy_agent
+ shutdown: "graceful"
app:
webclient:
# Configuration of the trust store used for the HTTP client (outgoing requests)
===============
This schema will by default be registerred for the type. The following properties are defined:
-* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka.
+* outputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. This consists of the following properties:
+
+ * topic, the name of the kafka topic
+ * bootStrapServers, reference to the kafka bus to used. This is optional, if this is omitted the default configured kafka bus is used (which is configured in the application.yaml file).
* filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
* sourceNames an array of source names for wanted PM reports.
* measObjInstIds an array of meas object instances for wanted PM reports. If a given filter value is contained in the filter definition, it will match (partial matching).
For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32".
-* measTypes selects the meas types to get
+* measTypeSpecs selects the meas types to get. This consist of:
+
+ * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
+ * measTypes the name of the measurement types (connected to the measObjClass).
* measuredEntityDns partial match of meas entity DNs.
-* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
* pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned.
In this case, the query is only done for files from the given "sourceNames".
"measObjInstIds":[
"UtranCell=dGbg-997"
],
- "measTypes":[
- "succImmediateAssignProcs"
- ],
+ "measTypeSpecs":[
+ {
+ "measuredObjClass":"UtranCell",
+ "measTypes":[
+ "succImmediateAssignProcs"
+ ]
+ }
+ ],
"measuredEntityDns":[
"ManagedElement=RNC-Gbg-1"
],
- "measObjClass":[
- "UtranCell"
- ]
"pmRopStartTime" : "2022-12-13T10:50:44.000-08:00"
}
}
"sourceNames":[
"O-DU-1122", "O-DU-1123"
],
- "measTypes":[
- "succImmediateAssignProcs", "attTCHSeizures"
- ],
- "measObjClass":[
- "UtranCell"
- ]
+ "measTypeSpecs":[
+ {
+ "measuredObjClass":"NRCellCU",
+ "measTypes":[
+ "pmCounterNumber0", "pmCounterNumber1"
+ ]
+ }
+ ],
+
}
}
public static void main(String[] args) {
applicationContext = SpringApplication.run(Application.class);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ logger.warn("Shutting down, received signal SIGTERM");
+ SpringApplication.exit(applicationContext);
+ applicationContext = null;
+ }
+ });
}
@Scheduled(fixedRate = 10 * 1000)
.map(this::toBody);
}
+ public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+ Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(request);
+ }
+
+ public Mono<String> post(String uri, @Nullable String body) {
+ return postForEntity(uri, body) //
+ .map(this::toBody);
+ }
+
private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
if (securityContext.isConfigured()) {
request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
}
public Collection<InfoType> getTypes() {
- com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
try {
String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
@Getter
public static class FilterData {
+
+ public static class MeasTypeSpec {
+ static MeasTypeSpec empty = new MeasTypeSpec();
+
+ static MeasTypeSpec empty() {
+ return empty;
+ }
+
+ @Getter
+ String measuredObjClass;
+
+ @Getter
+ final Set<String> measTypes = new HashSet<>();
+
+ @Override
+ public boolean equals(Object obj) {
+ return measuredObjClass.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return measuredObjClass.hashCode();
+ }
+ }
+
final Set<String> sourceNames = new HashSet<>();
final Set<String> measObjInstIds = new HashSet<>();
- final Set<String> measTypes = new HashSet<>();
+ final Collection<MeasTypeSpec> measTypeSpecs = new ArrayList<>();
final Set<String> measuredEntityDns = new HashSet<>();
- final Set<String> measObjClass = new HashSet<>();
+
+ public void addMeasTypes(String measObjClass, String... measTypes) {
+ MeasTypeSpec spec = this.findMeasTypeSpec(measObjClass);
+ if (spec == null) {
+ spec = new MeasTypeSpec();
+ spec.measuredObjClass = measObjClass;
+ this.measTypeSpecs.add(spec);
+ }
+ for (String measType : measTypes) {
+ spec.measTypes.add(measType);
+ }
+ }
+
+ public void addMeasTypes(String measObjClass, Collection<String> measTypes) {
+ for (String measType : measTypes) {
+ addMeasTypes(measObjClass, measType);
+ }
+ }
@Setter
String pmRopStartTime;
public void addAll(FilterData other) {
addAll(other.sourceNames, sourceNames);
addAll(other.measObjInstIds, measObjInstIds);
- addAll(other.measTypes, measTypes);
+ addAll(other.measTypeSpecs);
addAll(other.measuredEntityDns, measuredEntityDns);
- addAll(other.measObjClass, measObjClass);
+ }
+
+ public MeasTypeSpec getMeasTypeSpec(String measuredObjClass) {
+ if (measTypeSpecs.isEmpty()) {
+ return MeasTypeSpec.empty();
+ }
+ return findMeasTypeSpec(measuredObjClass);
+ }
+
+ private MeasTypeSpec findMeasTypeSpec(String measuredObjClass) {
+ for (MeasTypeSpec t : this.measTypeSpecs) {
+ if (t.measuredObjClass.equals(measuredObjClass)) {
+ return t;
+ }
+ }
+ return null;
+ }
+
+ private void addAll(Collection<MeasTypeSpec> measTypes) {
+ for (MeasTypeSpec s : measTypes) {
+ addMeasTypes(s.getMeasuredObjClass(), s.getMeasTypes());
+ }
}
private void addAll(Set<String> source, Set<String> dst) {
return false;
}
- private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, FilterData filter) {
+ private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes,
+ FilterData.MeasTypeSpec measTypesSpec) {
String measType = measTypes.getMeasType(measResult.getP());
- return filter.measTypes.isEmpty() || filter.measTypes.contains(measType);
+ return measTypesSpec.measTypes.isEmpty() || measTypesSpec.measTypes.contains(measType);
}
private Collection<MeasResult> createMeasResults(Collection<MeasResult> oldMeasResults, MeasTypes measTypes,
- FilterData filter) {
+ FilterData.MeasTypeSpec measTypesSpec) {
Collection<MeasResult> newMeasResults = new ArrayList<>();
for (MeasResult measResult : oldMeasResults) {
- if (isMeasResultMatch(measResult, measTypes, filter)) {
+ if (isMeasResultMatch(measResult, measTypes, measTypesSpec)) {
newMeasResults.add(measResult.toBuilder().build());
}
}
return distinguishedName.substring(lastRdn + 1, lastEqualChar);
}
- private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) {
- if (filter.measObjClass.isEmpty()) {
- return true;
- }
-
+ private FilterData.MeasTypeSpec getMeasTypeSpec(String measObjInstId, FilterData filter) {
String measObjClass = managedObjectClass(measObjInstId);
- return filter.measObjClass.contains(measObjClass);
+ return filter.getMeasTypeSpec(measObjClass);
}
private MeasValuesList createMeasValuesList(MeasValuesList oldMeasValues, MeasTypes measTypes, FilterData filter) {
+ FilterData.MeasTypeSpec measTypesSpec = getMeasTypeSpec(oldMeasValues.getMeasObjInstId(), filter);
+ if (measTypesSpec == null) {
+ return MeasValuesList.empty();
+ }
- if (isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)
- && isMeasInstClassMatch(oldMeasValues.getMeasObjInstId(), filter)) {
-
- Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, filter);
- return oldMeasValues.toBuilder() //
- .measResults(newResults) //
- .build();
- } else {
+ if (!isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)) {
return MeasValuesList.empty();
}
+
+ Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, measTypesSpec);
+ return oldMeasValues.toBuilder() //
+ .measResults(newResults) //
+ .build();
}
private MeasTypes createMeasTypes(Collection<MeasValuesList> newMeasValues, MeasTypes oldMMeasTypes) {
import java.time.Duration;
import lombok.Builder;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Schema(name = "job_statistics", description = "Statistics information for one job")
public static class Statistics {
- // @Schema(name = "jobId", description = "jobId", required = true)
- // @SerializedName("jobId")
@JsonProperty(value = "jobId", required = true)
String jobId;
@Getter
private BufferTimeout bufferTimeout;
+ @Builder
+ @EqualsAndHashCode
+ public static class KafkaDeliveryInfo {
+ @Getter
+ private String topic;
+
+ @Getter
+ private String bootStrapServers;
+ }
+
@Getter
- private String kafkaOutputTopic;
+ private KafkaDeliveryInfo deliveryInfo;
public Filter.Type getFilterType() {
if (filter == null || filterType == null) {
.groupId(type.getKafkaGroupId()) //
.inputTopic(type.getKafkaInputTopic()) //
.jobId(id) //
- .outputTopic(parameters.getKafkaOutputTopic()) //
+ .outputTopic(parameters.getDeliveryInfo() == null ? "" : parameters.getDeliveryInfo().topic) //
.typeId(type.getId()) //
.clientId(type.getKafkaClientId(appConfig)) //
.build();
import org.oran.dmaapadapter.filter.FilterFactory;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job.Parameters;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public Iterable<Job> getJobs();
- public String getTopic();
+ public KafkaDeliveryInfo getDeliveryInfo();
}
public static class JobGroupSingle implements JobGroup {
}
@Override
- public String getTopic() {
- return this.job.getParameters().getKafkaOutputTopic();
+ public KafkaDeliveryInfo getDeliveryInfo() {
+ return this.job.getParameters().getDeliveryInfo();
}
}
public static class JobGroupPm implements JobGroup {
@Getter
- private final String topic;
+ private final KafkaDeliveryInfo deliveryInfo;
private Map<String, Job> jobs = new HashMap<>();
@Getter
private final InfoType type;
- public JobGroupPm(InfoType type, String topic) {
- this.topic = topic;
+ public JobGroupPm(InfoType type, KafkaDeliveryInfo topic) {
+ this.deliveryInfo = topic;
this.type = type;
}
@Override
public String getId() {
- return topic;
+ return deliveryInfo.getTopic();
}
@Override
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
- private Map<String, JobGroup> jobGroups = new HashMap<>();
+ private Map<String, JobGroup> jobGroups = new HashMap<>(); // Key is Topic or JobId
private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
private final ApplicationConfig appConfig;
public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
Parameters parameters) throws ServiceException {
- if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) {
+ if ((parameters.getDeliveryInfo() != null) && !Strings.isNullOrEmpty(callbackUrl)) {
throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST);
}
AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
}
private String jobGroupId(Job job) {
- if (Strings.isNullOrEmpty(job.getParameters().getKafkaOutputTopic())) {
+ if (job.getParameters().getDeliveryInfo() == null) {
return job.getId();
} else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) {
- return job.getParameters().getKafkaOutputTopic();
+ return job.getParameters().getDeliveryInfo().getTopic();
} else {
return job.getId();
}
jobsByType.put(job.getType().getId(), job.getId(), job);
if (job.getParameters().getFilterType() == Filter.Type.PM_DATA
- && job.getParameters().getKafkaOutputTopic() != null) {
- String topic = job.getParameters().getKafkaOutputTopic();
- if (!this.jobGroups.containsKey(topic)) {
- final JobGroupPm group = new JobGroupPm(job.getType(), topic);
- this.jobGroups.put(topic, group);
+ && job.getParameters().getDeliveryInfo() != null) {
+ String jobGroupId = jobGroupId(job);
+ if (!this.jobGroups.containsKey(jobGroupId)) {
+ final JobGroupPm group = new JobGroupPm(job.getType(), job.getParameters().getDeliveryInfo());
+ this.jobGroups.put(jobGroupId, group);
group.add(job);
this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
} else {
- JobGroupPm group = (JobGroupPm) this.jobGroups.get(topic);
+ JobGroupPm group = (JobGroupPm) this.jobGroups.get(jobGroupId);
group.add(job);
}
} else {
JobGroupSingle group = new JobGroupSingle(job);
- this.jobGroups.put(job.getId(), group);
+ this.jobGroups.put(jobGroupId(job), group);
this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
}
}
}
public void remove(Job job) {
- String groupId = this.jobGroupId(job);
+ String groupId = jobGroupId(job);
JobGroup group = this.jobGroups.get(groupId);
synchronized (this) {
this.allJobs.remove(job.getId());
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
import org.oran.dmaapadapter.repository.Jobs.JobGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) {
super(jobGroup, appConfig);
- SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
+ SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig, jobGroup.getDeliveryInfo());
this.sender = KafkaSender.create(senderOptions);
}
@Override
protected Mono<String> sendToClient(Filter.FilteredData data) {
- SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getTopic());
+ SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo());
logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
- this.getJobGroup().getTopic());
+ this.getJobGroup().getDeliveryInfo());
return this.sender.send(Mono.just(senderRecord)) //
- .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getTopic())) //
+ .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) //
.doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
t.getMessage())) //
.onErrorResume(t -> Mono.empty()) //
}
}
- private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config) {
- String bootstrapServers = config.getKafkaBootStrapServers();
+ private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config,
+ KafkaDeliveryInfo deliveryInfo) {
+
+ String bootstrapServers = deliveryInfo.getBootStrapServers();
+ if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+ bootstrapServers = config.getKafkaBootStrapServers();
+ }
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return SenderOptions.create(props);
}
- private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, String topic) {
+ private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output,
+ KafkaDeliveryInfo deliveryInfo) {
int correlationMetadata = 2;
- var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
+ var producerRecord =
+ new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers());
return SenderRecord.create(producerRecord, correlationMetadata);
}
import lombok.Getter;
-import org.apache.logging.log4j.util.Strings;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
}
private JobDataDistributor createDistributor(JobGroup jobGroup) {
- return !Strings.isEmpty(jobGroup.getTopic()) ? new KafkaJobDataDistributor(jobGroup, appConfig)
+ return jobGroup.getDeliveryInfo() != null ? new KafkaJobDataDistributor(jobGroup, appConfig)
: new HttpJobDataDistributor(jobGroup, appConfig);
}
"type": "integer",
"minimum": 1
},
- "kafkaOutputTopic" : {
- "type": "string"
+ "deliveryInfo": {
+ "type": "object",
+ "properties": {
+ "topic": {
+ "type": "string"
+ },
+ "bootStrapServers": {
+ "type": "string"
+ }
+ },
+ "required": [
+ "topic"
+ ]
},
"bufferTimeout": {
"type": "object",
}
},
"additionalProperties": false
-}
+}
\ No newline at end of file
}
]
},
- "measObjClass": {
+ "measTypeSpecs": {
"type": "array",
"items": [
{
- "type": "string"
- }
- ]
- },
- "measTypes": {
- "type": "array",
- "items": [
- {
- "type": "string"
+ "type": "object",
+ "properties": {
+ "measuredObjClass": {
+ "type": "string"
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ }
+ },
+ "required": [
+ "measuredObjClass"
+ ]
}
]
},
"pmdata"
]
},
- "kafkaOutputTopic": {
- "type": "string"
+ "deliveryInfo": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "topic": {
+ "type": "string"
+ },
+ "bootStrapServers": {
+ "type": "string"
+ }
+ },
+ "required": [
+ "topic"
+ ]
}
- }
+ },
+ "required": [
+ "filter",
+ "filterType"
+ ]
}
\ No newline at end of file
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.clients.SecurityContext;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+@TestMethodOrder(MethodOrderer.MethodName.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
@Autowired
private SecurityContext securityContext;
- private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
@LocalServerPort
int localServerHttpPort;
// Create a job with a PM filter
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
// Create a job with a PM filter
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.addMeasTypes("ManagedElement", "succImmediateAssignProcs");
Job.Parameters param =
Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
String stats = restClient().get(targetUri).block();
assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+ }
+
+ @Test
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ void testZZActuator() throws Exception {
+ // The test must be run last, hence the "ZZ" in the name. All succeeding tests
+ // will fail.
+ AsyncRestClient client = restClient();
+ client.post("/actuator/loggers/org.oran.dmaapadapter", "{\"configuredLevel\":\"trace\"}").block();
+ String resp = client.get("/actuator/loggers/org.oran.dmaapadapter").block();
+ assertThat(resp).contains("TRACE");
+ client.post("/actuator/loggers/org.springframework.boot.actuate", "{\"configuredLevel\":\"trace\"}").block();
+ // This will stop the web server and all coming tests will fail.
+ client.post("/actuator/shutdown", "").block();
+ Thread.sleep(1000);
+ String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
+ StepVerifier.create(restClient().get(url)) // Any call
+ .expectSubscription() //
+ .expectErrorMatches(t -> t instanceof WebClientRequestException) //
+ .verify();
}
public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
try {
- Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
- .kafkaOutputTopic(topic).build();
+ Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+ .topic(topic) //
+ .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+ .build();
+ Job.Parameters param = Job.Parameters.builder() //
+ .filter(filterData) //
+ .filterType(Job.Parameters.PM_FILTER_TYPE).deliveryInfo(deliveryInfo) //
+ .build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
}
}
- ConsumerJobInfo consumerJobInfoKafka(String topic) {
+ private ConsumerJobInfo consumerJobInfoKafka(String topic) {
try {
- Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
+ Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+ .topic(topic) //
+ .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+ .build();
+ Job.Parameters param = Job.Parameters.builder().deliveryInfo(deliveryInfo).build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("pmCounterNumber0");
- filterData.getMeasObjClass().add("NRCellCU");
+
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
restClient());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
final int NO_OF_COUNTERS = 2;
for (int i = 0; i < NO_OF_COUNTERS; ++i) {
- filterData.getMeasTypes().add("pmCounterNumber" + i);
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
}
- filterData.getMeasObjClass().add("NRCellCU");
final int NO_OF_JOBS = 150;
final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
final String jobId = "manyJobs_" + i;
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("pmCounterNumber" + i); // all counters will be added
- filterData.getMeasObjClass().add("NRCellCU");
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
void testPmFilterMeasTypes() throws Exception {
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.measTypes.add("succImmediateAssignProcs");
+ filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
PmReportFilter filter = new PmReportFilter(filterData);
String filtered = filterReport(filter);
// Test that no report is returned if not meas types were found
filterData = new PmReportFilter.FilterData();
- filterData.measTypes.add("junk");
+ filterData.addMeasTypes("junk", "succImmediateAssignProcs");
+
filter = new PmReportFilter(filterData);
filtered = filterReport(filter);
assertThat(filtered).isEmpty();
void testMeasObjClass() throws Exception {
{
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.measObjClass.add("junk");
+ filterData.addMeasTypes("junk");
PmReportFilter filter = new PmReportFilter(filterData);
String filtered = filterReport(filter);
assertThat(filtered).isEmpty();
new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
- utranCellFilter.measObjClass.add("UtranCell");
+ utranCellFilter.addMeasTypes("UtranCell");
FilteredData filtered = new PmReportFilter(utranCellFilter).filter(data);
assertThat(filtered.getValueAString()).contains("UtranCell").doesNotContain("ENodeBFunction");
PmReportFilter.FilterData eNodeBFilter = new PmReportFilter.FilterData();
- eNodeBFilter.measObjClass.add("ENodeBFunction");
+ eNodeBFilter.addMeasTypes("ENodeBFunction");
filtered = new PmReportFilter(eNodeBFilter).filter(data);
assertThat(filtered.getValueAString()).contains("ENodeBFunction").doesNotContain("UtranCell");
}
{
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("pmCounterNumber0");
- filterData.getMeasObjClass().add("NRCellCU");
+ filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
PmReportFilter filter = new PmReportFilter(filterData);
DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes());
"event": {
"commonEventHeader": {
"domain": "perf3gpp",
- "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882",
- "sequence": 0,
"eventName": "perf3gpp_gnb-Ericsson_pmMeasResult",
"sourceName": "O-DU-1122",
"reportingEntityName": "",
- "priority": "Normal",
"startEpochMicrosec": 951912000000,
"lastEpochMicrosec": 951912900000,
- "version": "4.0",
- "vesEventListenerVersion": "7.1",
"timeZoneOffset": "+00:00"
},
"perf3gppFields": {