NONRTRIC - Statistics 24/8924/4
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 23 Aug 2022 11:25:43 +0000 (13:25 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 24 Aug 2022 07:10:37 +0000 (09:10 +0200)
Added feature for getting statistics.
   GET "/statistics"
Updated to latest 2.5 version of springboot.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ifc0844ca20cab00d3ca99e5b58b2f56721a5e9c0

12 files changed:
api/api.json
api/api.yaml
pom.xml
src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index a58aced..e8ea0c8 100644 (file)
             "description": "Void/empty",
             "type": "object"
         },
+        "job_statistics": {
+            "description": "Statistics information for one job",
+            "type": "object",
+            "required": [
+                "jobId",
+                "noOfReceivedBytes",
+                "noOfReceivedObjects",
+                "noOfSentBytes",
+                "noOfSentObjects",
+                "typeId"
+            ],
+            "properties": {
+                "noOfSentObjects": {
+                    "format": "int32",
+                    "type": "integer"
+                },
+                "jobId": {"type": "string"},
+                "outputTopic": {"type": "string"},
+                "noOfSentBytes": {
+                    "format": "int32",
+                    "type": "integer"
+                },
+                "clientId": {"type": "string"},
+                "groupId": {"type": "string"},
+                "noOfReceivedBytes": {
+                    "format": "int32",
+                    "type": "integer"
+                },
+                "typeId": {"type": "string"},
+                "inputTopic": {"type": "string"},
+                "noOfReceivedObjects": {
+                    "format": "int32",
+                    "type": "integer"
+                }
+            }
+        },
+        "statistics_info": {
+            "description": "Statistics information",
+            "type": "object",
+            "properties": {"jobStatistics": {
+                "description": "Statistics per job",
+                "type": "array",
+                "items": {"$ref": "#/components/schemas/job_statistics"}
+            }}
+        },
         "producer_registration_info": {
             "description": "Information for an Information Producer",
             "type": "object",
             }],
             "tags": ["Information Coordinator Service Simulator (exists only in test)"]
         }},
+        "/statistics": {"get": {
+            "summary": "Returns statistics",
+            "operationId": "getStatistics",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/statistics_info"}}}
+            }},
+            "tags": ["Producer job control API"]
+        }},
         "/generic_dataproducer/health_check": {"get": {
             "summary": "Producer supervision",
             "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
index 02697ee..bc10472 100644 (file)
@@ -72,6 +72,19 @@ paths:
             application/json:
               schema:
                 type: object
+  /statistics:
+    get:
+      tags:
+      - Producer job control API
+      summary: Returns statistics
+      operationId: getStatistics
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/statistics_info'
   /generic_dataproducer/health_check:
     get:
       tags:
@@ -455,6 +468,50 @@ components:
     void:
       type: object
       description: Void/empty
+    job_statistics:
+      required:
+      - jobId
+      - noOfReceivedBytes
+      - noOfReceivedObjects
+      - noOfSentBytes
+      - noOfSentObjects
+      - typeId
+      type: object
+      properties:
+        noOfSentObjects:
+          type: integer
+          format: int32
+        jobId:
+          type: string
+        outputTopic:
+          type: string
+        noOfSentBytes:
+          type: integer
+          format: int32
+        clientId:
+          type: string
+        groupId:
+          type: string
+        noOfReceivedBytes:
+          type: integer
+          format: int32
+        typeId:
+          type: string
+        inputTopic:
+          type: string
+        noOfReceivedObjects:
+          type: integer
+          format: int32
+      description: Statistics information for one job
+    statistics_info:
+      type: object
+      properties:
+        jobStatistics:
+          type: array
+          description: Statistics per job
+          items:
+            $ref: '#/components/schemas/job_statistics'
+      description: Statistics information
     producer_registration_info:
       required:
       - info_job_callback_url
diff --git a/pom.xml b/pom.xml
index 5f2216f..ffe624f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.5.8</version>
+        <version>2.5.14</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric.plt</groupId>
@@ -49,7 +49,7 @@
         <java.version>11</java.version>
         <springfox.version>3.0.0</springfox.version>
         <gson.version>2.9.0</gson.version>
-        <swagger.version>2.1.6</swagger.version>
+        <swagger.version>2.2.1</swagger.version>
         <json.version>20211205</json.version>
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
         <formatter-maven-plugin.version>2.12.2</formatter-maven-plugin.version>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>${gson.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.json</groupId>
-            <artifactId>json</artifactId>
-            <version>${json.version}</version>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.squareup.okhttp3</groupId>
-            <artifactId>mockwebserver</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
index 595de4a..4967626 100644 (file)
@@ -33,6 +33,7 @@ import io.swagger.v3.oas.annotations.tags.Tag;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerJobInfo;
@@ -61,6 +62,9 @@ public class ProducerCallbacksController {
     public static final String API_DESCRIPTION = "";
     public static final String JOB_URL = "/generic_dataproducer/info_job";
     public static final String SUPERVISION_URL = "/generic_dataproducer/health_check";
+
+    public static final String STATISTICS_URL = "/statistics";
+
     private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
     private final Jobs jobs;
     private final InfoTypes types;
@@ -145,4 +149,31 @@ public class ProducerCallbacksController {
         return new ResponseEntity<>(HttpStatus.OK);
     }
 
+    @Schema(name = "statistics_info", description = "Statistics information")
+    public class Statistics {
+
+        @Schema(description = "Statistics per job")
+        public final Collection<Job.Statistics> jobStatistics;
+
+        public Statistics(Collection<Job.Statistics> stats) {
+            this.jobStatistics = stats;
+        }
+
+    }
+
+    @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+    @Operation(summary = "Returns statistics", description = "")
+    @ApiResponses(value = { //
+            @ApiResponse(responseCode = "200", description = "OK", //
+                    content = @Content(schema = @Schema(implementation = Statistics.class))) //
+    })
+    public ResponseEntity<Object> getStatistics() {
+        List<Job.Statistics> res = new ArrayList<>();
+        for (Job job : this.jobs.getAll()) {
+            res.add(job.getStatistics());
+        }
+
+        return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK);
+    }
+
 }
index 7a0b707..ce2e1a1 100644 (file)
@@ -24,6 +24,7 @@ import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
 
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.springframework.util.StringUtils;
 
 @ToString
@@ -71,4 +72,13 @@ public class InfoType {
         }
         return DataType.OTHER;
     }
+
+    public String getKafkaGroupId() {
+        return this.kafkaInputTopic == null ? null : "osc-dmaap-adapter-" + getId();
+    }
+
+    public String getKafkaClientId(ApplicationConfig appConfig) {
+        return this.kafkaInputTopic == null ? null : getId() + "_" + appConfig.getSelfUrl();
+
+    }
 }
index 9581eb4..acb9136 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 
+import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.FilterFactory;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
@@ -38,6 +44,59 @@ import org.slf4j.LoggerFactory;
 public class Job {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    @Builder
+    @Schema(name = "job_statistics", description = "Statistics information for one job")
+    public static class Statistics {
+
+        // @Schema(name = "jobId", description = "jobId", required = true)
+        // @SerializedName("jobId")
+        @JsonProperty(value = "jobId", required = true)
+        String jobId;
+
+        @JsonProperty(value = "typeId", required = true)
+        String typeId;
+
+        @JsonProperty(value = "inputTopic", required = false)
+        String inputTopic;
+
+        @JsonProperty(value = "outputTopic", required = false)
+        String outputTopic;
+
+        @JsonProperty(value = "groupId", required = false)
+        String groupId;
+
+        @JsonProperty(value = "clientId", required = false)
+        String clientId;
+
+        @JsonProperty(value = "noOfReceivedObjects", required = true)
+        @Builder.Default
+        int noOfReceivedObjects = 0;
+
+        @JsonProperty(value = "noOfReceivedBytes", required = true)
+        @Builder.Default
+        int noOfReceivedBytes = 0;
+
+        @JsonProperty(value = "noOfSentObjects", required = true)
+        @Builder.Default
+        int noOfSentObjects = 0;
+
+        @JsonProperty(value = "noOfSentBytes", required = true)
+        @Builder.Default
+        int noOfSentBytes = 0;
+
+        public void received(String str) {
+            noOfReceivedBytes += str.length();
+            noOfReceivedObjects += 1;
+
+        }
+
+        public void filtered(String str) {
+            noOfSentBytes += str.length();
+            noOfSentObjects += 1;
+        }
+
+    }
+
     public static class Parameters {
         public static final String REGEXP_TYPE = "regexp";
         public static final String PM_FILTER_TYPE = "pmdata";
@@ -127,11 +186,14 @@ public class Job {
 
     private final Filter filter;
 
+    @Getter
+    private final Statistics statistics;
+
     @Getter
     private final AsyncRestClient consumerRestClient;
 
     public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
-            AsyncRestClient consumerRestClient) {
+            AsyncRestClient consumerRestClient, ApplicationConfig appConfig) {
         this.id = id;
         this.callbackUrl = callbackUrl;
         this.type = type;
@@ -141,6 +203,16 @@ public class Job {
         filter = parameters.filter == null ? null
                 : FilterFactory.create(parameters.getFilter(), parameters.getFilterType());
         this.consumerRestClient = consumerRestClient;
+
+        statistics = Statistics.builder() //
+                .groupId(type.getKafkaGroupId()) //
+                .inputTopic(type.getKafkaInputTopic()) //
+                .jobId(id) //
+                .outputTopic(parameters.getKafkaOutputTopic()) //
+                .typeId(type.getId()) //
+                .clientId(type.getKafkaClientId(appConfig)) //
+                .build();
+
     }
 
     public Filter.FilteredData filter(DataFromTopic data) {
index 825673a..2c6b329 100644 (file)
@@ -55,9 +55,12 @@ public class Jobs {
     private MultiMap<Job> jobsByType = new MultiMap<>();
     private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
+    private final ApplicationConfig appConfig;
 
-    public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) {
+    public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext,
+            @Autowired ApplicationConfig appConfig) {
         restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
+        this.appConfig = appConfig;
     }
 
     public synchronized Job getJob(String id) throws ServiceException {
@@ -81,7 +84,7 @@ public class Jobs {
         AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
                 ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
                 : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
-        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient, this.appConfig);
         this.put(job);
         synchronized (observers) {
             this.observers.forEach(obs -> obs.onJobbAdded(job));
index ae1f413..bda54a4 100644 (file)
@@ -77,7 +77,7 @@ public abstract class JobDataDistributor {
     public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         stop();
         this.errorStats.resetIrrecoverableErrors();
-        this.subscription = filterAndBuffer(input, job) //
+        this.subscription = filterAndBuffer(input, this.job) //
                 .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleSentOk, //
@@ -104,8 +104,11 @@ public abstract class JobDataDistributor {
     }
 
     private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
-        Flux<Filter.FilteredData> filtered = inputFlux.map(job::filter) //
-                .filter(f -> !f.isEmpty());
+        Flux<Filter.FilteredData> filtered = //
+                inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+                        .map(job::filter) //
+                        .filter(f -> !f.isEmpty()) //
+                        .doOnNext(f -> job.getStatistics().filtered(f.value)); //
 
         if (job.isBuffered()) {
             filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
index 7514d37..61b50c6 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.nio.charset.Charset;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
@@ -32,19 +41,10 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.ToString;
 import reactor.core.publisher.Flux;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
 
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-
 /**
  * The class streams incoming requests from a Kafka topic and sends them further
  * to a multi cast sink, which several other streams can connect to.
@@ -55,7 +55,6 @@ public class KafkaTopicListener implements TopicListener {
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private Flux<DataFromTopic> dataFromTopic;
-    private final String kafkaClientId;
 
     private static Gson gson = new GsonBuilder() //
             .disableHtmlEscaping() //
@@ -68,24 +67,25 @@ public class KafkaTopicListener implements TopicListener {
         private String filename;
     }
 
-    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type, String kafkaClientId) {
+    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
         this.type = type;
-        this.kafkaClientId = kafkaClientId;
     }
 
     @Override
     public Flux<DataFromTopic> getFlux() {
         if (this.dataFromTopic == null) {
-            this.dataFromTopic = startReceiveFromTopic(this.kafkaClientId);
+            this.dataFromTopic = startReceiveFromTopic(this.type.getKafkaClientId(this.applicationConfig));
         }
         return this.dataFromTopic;
     }
 
     private Flux<DataFromTopic> startReceiveFromTopic(String clientId) {
         logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+
         return KafkaReceiver.create(kafkaInputProperties(clientId)) //
-                .receive() //
+                .receiveAutoAck() //
+                .concatMap(consumerRecord -> consumerRecord) //
                 .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
                         input.value())) //
                 .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
@@ -99,8 +99,8 @@ public class KafkaTopicListener implements TopicListener {
 
     private DataFromTopic getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
 
-        if (!applicationConfig.getPmFilesPath().isEmpty()
-                && this.type.getDataType() == InfoType.DataType.PM_DATA
+        if (!applicationConfig.getPmFilesPath().isEmpty() //
+                && this.type.getDataType() == InfoType.DataType.PM_DATA //
                 && data.value.length() < 1000) {
             try {
                 NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
@@ -121,7 +121,7 @@ public class KafkaTopicListener implements TopicListener {
             logger.error("No kafka boostrap server is setup");
         }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + this.type.getId());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
index ea7ab81..fcc94ee 100644 (file)
@@ -59,8 +59,7 @@ public class TopicListeners {
 
         for (InfoType type : types.getAll()) {
             if (type.isKafkaTopicDefined()) {
-                KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type,
-                        type.getId() + "_" + appConfig.getSelfUrl());
+                KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
                 kafkaTopicListeners.put(type.getId(), topicConsumer);
             }
             if (type.isDmaapTopicDefined()) {
index 635fa65..2f1f6c9 100644 (file)
@@ -47,6 +47,7 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReport;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
@@ -554,6 +555,23 @@ class ApplicationTest {
                 .hasSize(this.types.size()));
     }
 
+    @Test
+    void testStatistics() throws ServiceException {
+        // Register producer, Register types
+        waitForRegistration();
+        final String JOB_ID = "testStatistics";
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp());
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+        String stats = restClient().get(targetUri).block();
+
+        assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+
+    }
+
     public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
         testErrorCode(request, expStatus, responseContains, true);
     }
index 488fc89..45bee5d 100644 (file)
@@ -47,6 +47,7 @@ import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
@@ -80,7 +81,7 @@ import reactor.kafka.sender.SenderRecord;
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
-        "app.pm-files-path=./src/test/resources/" }) //
+        "app.pm-files-path=./src/test/resources/"}) //
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
@@ -168,10 +169,10 @@ class IntegrationWithKafka {
 
             // Create a listener to the output topic. The KafkaTopicListener happens to be
             // suitable for that,
-            InfoType type = InfoType.builder().id("id").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
+            InfoType type =
+                    InfoType.builder().id("TestReceiver").kafkaInputTopic(OUTPUT_TOPIC).dataType("dataType").build();
 
-            KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type,
-                    "TestClientId" + "_" + outputTopic);
+            KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
 
             topicListener.getFlux() //
                     .doOnNext(this::set) //
@@ -415,6 +416,14 @@ class IntegrationWithKafka {
 
         await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
         assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
+
+        printStatistics();
+    }
+
+    private void printStatistics() {
+        String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+        String stats = restClient().get(targetUri).block();
+        logger.info("Stats : {}", stats);
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@@ -475,8 +484,8 @@ class IntegrationWithKafka {
 
         Instant startTime = Instant.now();
 
-        KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json")
-                .build();
+        KafkaTopicListener.NewFileEvent event =
+                KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build();
         String eventAsString = gson.toJson(event);
 
         String path = "./src/test/resources/pm_report.json";
@@ -495,6 +504,8 @@ class IntegrationWithKafka {
         final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
+
+        printStatistics();
     }
 
     @Test