From 65b24828c180d56807342679df310b8707fe8cb9 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Tue, 23 Aug 2022 13:25:43 +0200 Subject: [PATCH] NONRTRIC - Statistics Added feature for getting statistics. GET "/statistics" Updated to latest 2.5 version of springboot. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Ifc0844ca20cab00d3ca99e5b58b2f56721a5e9c0 --- api/api.json | 54 ++++++++++++++++ api/api.yaml | 57 +++++++++++++++++ pom.xml | 15 +---- .../controllers/ProducerCallbacksController.java | 31 +++++++++ .../org/oran/dmaapadapter/repository/InfoType.java | 10 +++ .../java/org/oran/dmaapadapter/repository/Job.java | 74 +++++++++++++++++++++- .../org/oran/dmaapadapter/repository/Jobs.java | 7 +- .../dmaapadapter/tasks/JobDataDistributor.java | 9 ++- .../dmaapadapter/tasks/KafkaTopicListener.java | 34 +++++----- .../oran/dmaapadapter/tasks/TopicListeners.java | 3 +- .../org/oran/dmaapadapter/ApplicationTest.java | 18 ++++++ .../oran/dmaapadapter/IntegrationWithKafka.java | 23 +++++-- 12 files changed, 291 insertions(+), 44 deletions(-) diff --git a/api/api.json b/api/api.json index a58aced..e8ea0c8 100644 --- a/api/api.json +++ b/api/api.json @@ -52,6 +52,51 @@ "description": "Void/empty", "type": "object" }, + "job_statistics": { + "description": "Statistics information for one job", + "type": "object", + "required": [ + "jobId", + "noOfReceivedBytes", + "noOfReceivedObjects", + "noOfSentBytes", + "noOfSentObjects", + "typeId" + ], + "properties": { + "noOfSentObjects": { + "format": "int32", + "type": "integer" + }, + "jobId": {"type": "string"}, + "outputTopic": {"type": "string"}, + "noOfSentBytes": { + "format": "int32", + "type": "integer" + }, + "clientId": {"type": "string"}, + "groupId": {"type": "string"}, + "noOfReceivedBytes": { + "format": "int32", + "type": "integer" + }, + "typeId": {"type": "string"}, + "inputTopic": {"type": "string"}, + "noOfReceivedObjects": { + "format": "int32", + "type": "integer" + } + } + }, + "statistics_info": { + "description": "Statistics information", + "type": "object", + "properties": {"jobStatistics": { + "description": "Statistics per job", + "type": "array", + "items": {"$ref": "#/components/schemas/job_statistics"} + }} + }, "producer_registration_info": { "description": "Information for an Information Producer", "type": "object", @@ -143,6 +188,15 @@ }], "tags": ["Information Coordinator Service Simulator (exists only in test)"] }}, + "/statistics": {"get": { + "summary": "Returns statistics", + "operationId": "getStatistics", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/statistics_info"}}} + }}, + "tags": ["Producer job control API"] + }}, "/generic_dataproducer/health_check": {"get": { "summary": "Producer supervision", "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.", diff --git a/api/api.yaml b/api/api.yaml index 02697ee..bc10472 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -72,6 +72,19 @@ paths: application/json: schema: type: object + /statistics: + get: + tags: + - Producer job control API + summary: Returns statistics + operationId: getStatistics + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/statistics_info' /generic_dataproducer/health_check: get: tags: @@ -455,6 +468,50 @@ components: void: type: object description: Void/empty + job_statistics: + required: + - jobId + - noOfReceivedBytes + - noOfReceivedObjects + - noOfSentBytes + - noOfSentObjects + - typeId + type: object + properties: + noOfSentObjects: + type: integer + format: int32 + jobId: + type: string + outputTopic: + type: string + noOfSentBytes: + type: integer + format: int32 + clientId: + type: string + groupId: + type: string + noOfReceivedBytes: + type: integer + format: int32 + typeId: + type: string + inputTopic: + type: string + noOfReceivedObjects: + type: integer + format: int32 + description: Statistics information for one job + statistics_info: + type: object + properties: + jobStatistics: + type: array + description: Statistics per job + items: + $ref: '#/components/schemas/job_statistics' + description: Statistics information producer_registration_info: required: - info_job_callback_url diff --git a/pom.xml b/pom.xml index 5f2216f..ffe624f 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.springframework.boot spring-boot-starter-parent - 2.5.8 + 2.5.14 org.o-ran-sc.nonrtric.plt @@ -49,7 +49,7 @@ 11 3.0.0 2.9.0 - 2.1.6 + 2.2.1 20211205 3.8.0 2.12.2 @@ -90,12 +90,6 @@ com.google.code.gson gson - ${gson.version} - - - org.json - json - ${json.version} org.projectlombok @@ -183,11 +177,6 @@ mockito-core test - - com.squareup.okhttp3 - mockwebserver - test - io.projectreactor.kafka reactor-kafka diff --git a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index 595de4a..4967626 100644 --- a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -33,6 +33,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ProducerJobInfo; @@ -61,6 +62,9 @@ public class ProducerCallbacksController { public static final String API_DESCRIPTION = ""; public static final String JOB_URL = "/generic_dataproducer/info_job"; public static final String SUPERVISION_URL = "/generic_dataproducer/health_check"; + + public static final String STATISTICS_URL = "/statistics"; + private static Gson gson = new GsonBuilder().disableHtmlEscaping().create(); private final Jobs jobs; private final InfoTypes types; @@ -145,4 +149,31 @@ public class ProducerCallbacksController { return new ResponseEntity<>(HttpStatus.OK); } + @Schema(name = "statistics_info", description = "Statistics information") + public class Statistics { + + @Schema(description = "Statistics per job") + public final Collection jobStatistics; + + public Statistics(Collection stats) { + this.jobStatistics = stats; + } + + } + + @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Returns statistics", description = "") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = Statistics.class))) // + }) + public ResponseEntity getStatistics() { + List res = new ArrayList<>(); + for (Job job : this.jobs.getAll()) { + res.add(job.getStatistics()); + } + + return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK); + } + } diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index 7a0b707..ce2e1a1 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Getter; import lombok.ToString; +import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.springframework.util.StringUtils; @ToString @@ -71,4 +72,13 @@ public class InfoType { } return DataType.OTHER; } + + public String getKafkaGroupId() { + return this.kafkaInputTopic == null ? null : "osc-dmaap-adapter-" + getId(); + } + + public String getKafkaClientId(ApplicationConfig appConfig) { + return this.kafkaInputTopic == null ? null : getId() + "_" + appConfig.getSelfUrl(); + + } } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 9581eb4..acb9136 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -20,14 +20,20 @@ package org.oran.dmaapadapter.repository; +import com.fasterxml.jackson.annotation.JsonProperty; + +import io.swagger.v3.oas.annotations.media.Schema; + import java.lang.invoke.MethodHandles; import java.time.Duration; +import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.FilterFactory; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; @@ -38,6 +44,59 @@ import org.slf4j.LoggerFactory; public class Job { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @Builder + @Schema(name = "job_statistics", description = "Statistics information for one job") + public static class Statistics { + + // @Schema(name = "jobId", description = "jobId", required = true) + // @SerializedName("jobId") + @JsonProperty(value = "jobId", required = true) + String jobId; + + @JsonProperty(value = "typeId", required = true) + String typeId; + + @JsonProperty(value = "inputTopic", required = false) + String inputTopic; + + @JsonProperty(value = "outputTopic", required = false) + String outputTopic; + + @JsonProperty(value = "groupId", required = false) + String groupId; + + @JsonProperty(value = "clientId", required = false) + String clientId; + + @JsonProperty(value = "noOfReceivedObjects", required = true) + @Builder.Default + int noOfReceivedObjects = 0; + + @JsonProperty(value = "noOfReceivedBytes", required = true) + @Builder.Default + int noOfReceivedBytes = 0; + + @JsonProperty(value = "noOfSentObjects", required = true) + @Builder.Default + int noOfSentObjects = 0; + + @JsonProperty(value = "noOfSentBytes", required = true) + @Builder.Default + int noOfSentBytes = 0; + + public void received(String str) { + noOfReceivedBytes += str.length(); + noOfReceivedObjects += 1; + + } + + public void filtered(String str) { + noOfSentBytes += str.length(); + noOfSentObjects += 1; + } + + } + public static class Parameters { public static final String REGEXP_TYPE = "regexp"; public static final String PM_FILTER_TYPE = "pmdata"; @@ -127,11 +186,14 @@ public class Job { private final Filter filter; + @Getter + private final Statistics statistics; + @Getter private final AsyncRestClient consumerRestClient; public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters, - AsyncRestClient consumerRestClient) { + AsyncRestClient consumerRestClient, ApplicationConfig appConfig) { this.id = id; this.callbackUrl = callbackUrl; this.type = type; @@ -141,6 +203,16 @@ public class Job { filter = parameters.filter == null ? null : FilterFactory.create(parameters.getFilter(), parameters.getFilterType()); this.consumerRestClient = consumerRestClient; + + statistics = Statistics.builder() // + .groupId(type.getKafkaGroupId()) // + .inputTopic(type.getKafkaInputTopic()) // + .jobId(id) // + .outputTopic(parameters.getKafkaOutputTopic()) // + .typeId(type.getId()) // + .clientId(type.getKafkaClientId(appConfig)) // + .build(); + } public Filter.FilteredData filter(DataFromTopic data) { diff --git a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 825673a..2c6b329 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -55,9 +55,12 @@ public class Jobs { private MultiMap jobsByType = new MultiMap<>(); private final AsyncRestClientFactory restclientFactory; private final List observers = new ArrayList<>(); + private final ApplicationConfig appConfig; - public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) { + public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext, + @Autowired ApplicationConfig appConfig) { restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext); + this.appConfig = appConfig; } public synchronized Job getJob(String id) throws ServiceException { @@ -81,7 +84,7 @@ public class Jobs { AsyncRestClient consumerRestClient = type.isUseHttpProxy() // ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) // : restclientFactory.createRestClientNoHttpProxy(callbackUrl); - Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient); + Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig); this.put(job); synchronized (observers) { this.observers.forEach(obs -> obs.onJobbAdded(job)); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index ae1f413..bda54a4 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -77,7 +77,7 @@ public abstract class JobDataDistributor { public synchronized void start(Flux input) { stop(); this.errorStats.resetIrrecoverableErrors(); - this.subscription = filterAndBuffer(input, job) // + this.subscription = filterAndBuffer(input, this.job) // .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // @@ -104,8 +104,11 @@ public abstract class JobDataDistributor { } private Flux filterAndBuffer(Flux inputFlux, Job job) { - Flux filtered = inputFlux.map(job::filter) // - .filter(f -> !f.isEmpty()); + Flux filtered = // + inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) // + .map(job::filter) // + .filter(f -> !f.isEmpty()) // + .doOnNext(f -> job.getStatistics().filtered(f.value)); // if (job.isBuffered()) { filtered = filtered.map(input -> quoteNonJson(input.value, job)) // diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 7514d37..61b50c6 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -20,11 +20,20 @@ package org.oran.dmaapadapter.tasks; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.nio.charset.Charset; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import lombok.Builder; +import lombok.Getter; +import lombok.ToString; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; @@ -32,19 +41,10 @@ import org.oran.dmaapadapter.repository.InfoType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -import lombok.Builder; -import lombok.Getter; -import lombok.ToString; import reactor.core.publisher.Flux; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; -import java.nio.charset.Charset; -import java.nio.file.Files; - /** * The class streams incoming requests from a Kafka topic and sends them further * to a multi cast sink, which several other streams can connect to. @@ -55,7 +55,6 @@ public class KafkaTopicListener implements TopicListener { private final ApplicationConfig applicationConfig; private final InfoType type; private Flux dataFromTopic; - private final String kafkaClientId; private static Gson gson = new GsonBuilder() // .disableHtmlEscaping() // @@ -68,24 +67,25 @@ public class KafkaTopicListener implements TopicListener { private String filename; } - public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type, String kafkaClientId) { + public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; this.type = type; - this.kafkaClientId = kafkaClientId; } @Override public Flux getFlux() { if (this.dataFromTopic == null) { - this.dataFromTopic = startReceiveFromTopic(this.kafkaClientId); + this.dataFromTopic = startReceiveFromTopic(this.type.getKafkaClientId(this.applicationConfig)); } return this.dataFromTopic; } private Flux startReceiveFromTopic(String clientId) { logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId()); + return KafkaReceiver.create(kafkaInputProperties(clientId)) // - .receive() // + .receiveAutoAck() // + .concatMap(consumerRecord -> consumerRecord) // .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value())) // .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // @@ -99,8 +99,8 @@ public class KafkaTopicListener implements TopicListener { private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) { - if (!applicationConfig.getPmFilesPath().isEmpty() - && this.type.getDataType() == InfoType.DataType.PM_DATA + if (!applicationConfig.getPmFilesPath().isEmpty() // + && this.type.getDataType() == InfoType.DataType.PM_DATA // && data.value.length() < 1000) { try { NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); @@ -121,7 +121,7 @@ public class KafkaTopicListener implements TopicListener { logger.error("No kafka boostrap server is setup"); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + this.type.getId()); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index ea7ab81..fcc94ee 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -59,8 +59,7 @@ public class TopicListeners { for (InfoType type : types.getAll()) { if (type.isKafkaTopicDefined()) { - KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type, - type.getId() + "_" + appConfig.getSelfUrl()); + KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type); kafkaTopicListeners.put(type.getId(), topicConsumer); } if (type.isDmaapTopicDefined()) { diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 635fa65..2f1f6c9 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -47,6 +47,7 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReport; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.r1.ConsumerJobInfo; @@ -554,6 +555,23 @@ class ApplicationTest { .hasSize(this.types.size())); } + @Test + void testStatistics() throws ServiceException { + // Register producer, Register types + waitForRegistration(); + final String JOB_ID = "testStatistics"; + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()); + + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; + String stats = restClient().get(targetUri).block(); + + assertThat(stats).contains(JOB_ID, "DmaapInformationType"); + + } + public static void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { testErrorCode(request, expStatus, responseContains, true); } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 488fc89..45bee5d 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -47,6 +47,7 @@ import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.controllers.ProducerCallbacksController; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.r1.ConsumerJobInfo; @@ -80,7 +81,7 @@ import reactor.kafka.sender.SenderRecord; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // - "app.pm-files-path=./src/test/resources/" }) // + "app.pm-files-path=./src/test/resources/"}) // class IntegrationWithKafka { final String TYPE_ID = "KafkaInformationType"; @@ -168,10 +169,10 @@ class IntegrationWithKafka { // Create a listener to the output topic. The KafkaTopicListener happens to be // suitable for that, - InfoType type = InfoType.builder().id("id").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build(); + InfoType type = + InfoType.builder().id("TestReceiver").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build(); - KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type, - "TestClientId" + "_" + outputTopic); + KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type); topicListener.getFlux() // .doOnNext(this::set) // @@ -415,6 +416,14 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString)); assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey); + + printStatistics(); + } + + private void printStatistics() { + String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; + String stats = restClient().get(targetUri).block(); + logger.info("Stats : {}", stats); } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @@ -475,8 +484,8 @@ class IntegrationWithKafka { Instant startTime = Instant.now(); - KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json") - .build(); + KafkaTopicListener.NewFileEvent event = + KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build(); String eventAsString = gson.toJson(event); String path = "./src/test/resources/pm_report.json"; @@ -495,6 +504,8 @@ class IntegrationWithKafka { final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); + + printStatistics(); } @Test -- 2.16.6