Added feature for getting statistics.
GET "/statistics"
Updated to latest 2.5 version of springboot.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ifc0844ca20cab00d3ca99e5b58b2f56721a5e9c0
"description": "Void/empty",
"type": "object"
},
+ "job_statistics": {
+ "description": "Statistics information for one job",
+ "type": "object",
+ "required": [
+ "jobId",
+ "noOfReceivedBytes",
+ "noOfReceivedObjects",
+ "noOfSentBytes",
+ "noOfSentObjects",
+ "typeId"
+ ],
+ "properties": {
+ "noOfSentObjects": {
+ "format": "int32",
+ "type": "integer"
+ },
+ "jobId": {"type": "string"},
+ "outputTopic": {"type": "string"},
+ "noOfSentBytes": {
+ "format": "int32",
+ "type": "integer"
+ },
+ "clientId": {"type": "string"},
+ "groupId": {"type": "string"},
+ "noOfReceivedBytes": {
+ "format": "int32",
+ "type": "integer"
+ },
+ "typeId": {"type": "string"},
+ "inputTopic": {"type": "string"},
+ "noOfReceivedObjects": {
+ "format": "int32",
+ "type": "integer"
+ }
+ }
+ },
+ "statistics_info": {
+ "description": "Statistics information",
+ "type": "object",
+ "properties": {"jobStatistics": {
+ "description": "Statistics per job",
+ "type": "array",
+ "items": {"$ref": "#/components/schemas/job_statistics"}
+ }}
+ },
"producer_registration_info": {
"description": "Information for an Information Producer",
"type": "object",
}],
"tags": ["Information Coordinator Service Simulator (exists only in test)"]
}},
+ "/statistics": {"get": {
+ "summary": "Returns statistics",
+ "operationId": "getStatistics",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/statistics_info"}}}
+ }},
+ "tags": ["Producer job control API"]
+ }},
"/generic_dataproducer/health_check": {"get": {
"summary": "Producer supervision",
"description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
application/json:
schema:
type: object
+ /statistics:
+ get:
+ tags:
+ - Producer job control API
+ summary: Returns statistics
+ operationId: getStatistics
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/statistics_info'
/generic_dataproducer/health_check:
get:
tags:
void:
type: object
description: Void/empty
+ job_statistics:
+ required:
+ - jobId
+ - noOfReceivedBytes
+ - noOfReceivedObjects
+ - noOfSentBytes
+ - noOfSentObjects
+ - typeId
+ type: object
+ properties:
+ noOfSentObjects:
+ type: integer
+ format: int32
+ jobId:
+ type: string
+ outputTopic:
+ type: string
+ noOfSentBytes:
+ type: integer
+ format: int32
+ clientId:
+ type: string
+ groupId:
+ type: string
+ noOfReceivedBytes:
+ type: integer
+ format: int32
+ typeId:
+ type: string
+ inputTopic:
+ type: string
+ noOfReceivedObjects:
+ type: integer
+ format: int32
+ description: Statistics information for one job
+ statistics_info:
+ type: object
+ properties:
+ jobStatistics:
+ type: array
+ description: Statistics per job
+ items:
+ $ref: '#/components/schemas/job_statistics'
+ description: Statistics information
producer_registration_info:
required:
- info_job_callback_url
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.8</version>
+ <version>2.5.14</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric.plt</groupId>
<java.version>11</java.version>
<springfox.version>3.0.0</springfox.version>
<gson.version>2.9.0</gson.version>
- <swagger.version>2.1.6</swagger.version>
+ <swagger.version>2.2.1</swagger.version>
<json.version>20211205</json.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<formatter-maven-plugin.version>2.12.2</formatter-maven-plugin.version>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>${gson.version}</version>
- </dependency>
- <dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- <version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>mockwebserver</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
public static final String API_DESCRIPTION = "";
public static final String JOB_URL = "/generic_dataproducer/info_job";
public static final String SUPERVISION_URL = "/generic_dataproducer/health_check";
+
+ public static final String STATISTICS_URL = "/statistics";
+
private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
private final Jobs jobs;
private final InfoTypes types;
return new ResponseEntity<>(HttpStatus.OK);
}
+ @Schema(name = "statistics_info", description = "Statistics information")
+ public class Statistics {
+
+ @Schema(description = "Statistics per job")
+ public final Collection<Job.Statistics> jobStatistics;
+
+ public Statistics(Collection<Job.Statistics> stats) {
+ this.jobStatistics = stats;
+ }
+
+ }
+
+ @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "Returns statistics", description = "")
+ @ApiResponses(value = { //
+ @ApiResponse(responseCode = "200", description = "OK", //
+ content = @Content(schema = @Schema(implementation = Statistics.class))) //
+ })
+ public ResponseEntity<Object> getStatistics() {
+ List<Job.Statistics> res = new ArrayList<>();
+ for (Job job : this.jobs.getAll()) {
+ res.add(job.getStatistics());
+ }
+
+ return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK);
+ }
+
}
import lombok.Getter;
import lombok.ToString;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.springframework.util.StringUtils;
@ToString
}
return DataType.OTHER;
}
+
+ public String getKafkaGroupId() {
+ return this.kafkaInputTopic == null ? null : "osc-dmaap-adapter-" + getId();
+ }
+
+ public String getKafkaClientId(ApplicationConfig appConfig) {
+ return this.kafkaInputTopic == null ? null : getId() + "_" + appConfig.getSelfUrl();
+
+ }
}
package org.oran.dmaapadapter.repository;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
import java.lang.invoke.MethodHandles;
import java.time.Duration;
+import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.FilterFactory;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
public class Job {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ @Builder
+ @Schema(name = "job_statistics", description = "Statistics information for one job")
+ public static class Statistics {
+
+ // @Schema(name = "jobId", description = "jobId", required = true)
+ // @SerializedName("jobId")
+ @JsonProperty(value = "jobId", required = true)
+ String jobId;
+
+ @JsonProperty(value = "typeId", required = true)
+ String typeId;
+
+ @JsonProperty(value = "inputTopic", required = false)
+ String inputTopic;
+
+ @JsonProperty(value = "outputTopic", required = false)
+ String outputTopic;
+
+ @JsonProperty(value = "groupId", required = false)
+ String groupId;
+
+ @JsonProperty(value = "clientId", required = false)
+ String clientId;
+
+ @JsonProperty(value = "noOfReceivedObjects", required = true)
+ @Builder.Default
+ int noOfReceivedObjects = 0;
+
+ @JsonProperty(value = "noOfReceivedBytes", required = true)
+ @Builder.Default
+ int noOfReceivedBytes = 0;
+
+ @JsonProperty(value = "noOfSentObjects", required = true)
+ @Builder.Default
+ int noOfSentObjects = 0;
+
+ @JsonProperty(value = "noOfSentBytes", required = true)
+ @Builder.Default
+ int noOfSentBytes = 0;
+
+ public void received(String str) {
+ noOfReceivedBytes += str.length();
+ noOfReceivedObjects += 1;
+
+ }
+
+ public void filtered(String str) {
+ noOfSentBytes += str.length();
+ noOfSentObjects += 1;
+ }
+
+ }
+
public static class Parameters {
public static final String REGEXP_TYPE = "regexp";
public static final String PM_FILTER_TYPE = "pmdata";
private final Filter filter;
+ @Getter
+ private final Statistics statistics;
+
@Getter
private final AsyncRestClient consumerRestClient;
public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
- AsyncRestClient consumerRestClient) {
+ AsyncRestClient consumerRestClient, ApplicationConfig appConfig) {
this.id = id;
this.callbackUrl = callbackUrl;
this.type = type;
filter = parameters.filter == null ? null
: FilterFactory.create(parameters.getFilter(), parameters.getFilterType());
this.consumerRestClient = consumerRestClient;
+
+ statistics = Statistics.builder() //
+ .groupId(type.getKafkaGroupId()) //
+ .inputTopic(type.getKafkaInputTopic()) //
+ .jobId(id) //
+ .outputTopic(parameters.getKafkaOutputTopic()) //
+ .typeId(type.getId()) //
+ .clientId(type.getKafkaClientId(appConfig)) //
+ .build();
+
}
public Filter.FilteredData filter(DataFromTopic data) {
private MultiMap<Job> jobsByType = new MultiMap<>();
private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
+ private final ApplicationConfig appConfig;
- public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) {
+ public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext,
+ @Autowired ApplicationConfig appConfig) {
restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
+ this.appConfig = appConfig;
}
public synchronized Job getJob(String id) throws ServiceException {
AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
: restclientFactory.createRestClientNoHttpProxy(callbackUrl);
- Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+ Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
this.put(job);
synchronized (observers) {
this.observers.forEach(obs -> obs.onJobbAdded(job));
public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
- this.subscription = filterAndBuffer(input, job) //
+ this.subscription = filterAndBuffer(input, this.job) //
.flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
}
private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
- Flux<Filter.FilteredData> filtered = inputFlux.map(job::filter) //
- .filter(f -> !f.isEmpty());
+ Flux<Filter.FilteredData> filtered = //
+ inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+ .map(job::filter) //
+ .filter(f -> !f.isEmpty()) //
+ .doOnNext(f -> job.getStatistics().filtered(f.value)); //
if (job.isBuffered()) {
filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
package org.oran.dmaapadapter.tasks;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.ToString;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-
/**
* The class streams incoming requests from a Kafka topic and sends them further
* to a multi cast sink, which several other streams can connect to.
private final ApplicationConfig applicationConfig;
private final InfoType type;
private Flux<DataFromTopic> dataFromTopic;
- private final String kafkaClientId;
private static Gson gson = new GsonBuilder() //
.disableHtmlEscaping() //
private String filename;
}
- public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type, String kafkaClientId) {
+ public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
this.type = type;
- this.kafkaClientId = kafkaClientId;
}
@Override
public Flux<DataFromTopic> getFlux() {
if (this.dataFromTopic == null) {
- this.dataFromTopic = startReceiveFromTopic(this.kafkaClientId);
+ this.dataFromTopic = startReceiveFromTopic(this.type.getKafkaClientId(this.applicationConfig));
}
return this.dataFromTopic;
}
private Flux<DataFromTopic> startReceiveFromTopic(String clientId) {
logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+
return KafkaReceiver.create(kafkaInputProperties(clientId)) //
- .receive() //
+ .receiveAutoAck() //
+ .concatMap(consumerRecord -> consumerRecord) //
.doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
input.value())) //
.doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
- if (!applicationConfig.getPmFilesPath().isEmpty()
- && this.type.getDataType() == InfoType.DataType.PM_DATA
+ if (!applicationConfig.getPmFilesPath().isEmpty() //
+ && this.type.getDataType() == InfoType.DataType.PM_DATA //
&& data.value.length() < 1000) {
try {
NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
logger.error("No kafka boostrap server is setup");
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + this.type.getId());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
for (InfoType type : types.getAll()) {
if (type.isKafkaTopicDefined()) {
- KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type,
- type.getId() + "_" + appConfig.getSelfUrl());
+ KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
kafkaTopicListeners.put(type.getId(), topicConsumer);
}
if (type.isDmaapTopicDefined()) {
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReport;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
.hasSize(this.types.size()));
}
+ @Test
+ void testStatistics() throws ServiceException {
+ // Register producer, Register types
+ waitForRegistration();
+ final String JOB_ID = "testStatistics";
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp());
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+ String stats = restClient().get(targetUri).block();
+
+ assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+
+ }
+
public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
testErrorCode(request, expStatus, responseContains, true);
}
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.pm-files-path=./src/test/resources/" }) //
+ "app.pm-files-path=./src/test/resources/"}) //
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
// Create a listener to the output topic. The KafkaTopicListener happens to be
// suitable for that,
- InfoType type = InfoType.builder().id("id").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
+ InfoType type =
+ InfoType.builder().id("TestReceiver").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
- KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type,
- "TestClientId" + "_" + outputTopic);
+ KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
topicListener.getFlux() //
.doOnNext(this::set) //
await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
+
+ printStatistics();
+ }
+
+ private void printStatistics() {
+ String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+ String stats = restClient().get(targetUri).block();
+ logger.info("Stats : {}", stats);
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
Instant startTime = Instant.now();
- KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json")
- .build();
+ KafkaTopicListener.NewFileEvent event =
+ KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build();
String eventAsString = gson.toJson(event);
String path = "./src/test/resources/pm_report.json";
final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
+
+ printStatistics();
}
@Test