Minor changes 03/9403/5
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 26 Oct 2022 12:08:02 +0000 (14:08 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 27 Oct 2022 14:01:03 +0000 (16:01 +0200)
Added pmRopEndTime in the PM filtering.
Simplified the PM job schema.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Idd5d501e65643cf37b101f9e4e64d446b2d9940f

16 files changed:
src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
src/main/java/org/oran/dmaapadapter/filter/PmReport.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java
src/test/resources/test_application_configuration.json

index 4967626..1816dc3 100644 (file)
@@ -150,22 +150,21 @@ public class ProducerCallbacksController {
     }
 
     @Schema(name = "statistics_info", description = "Statistics information")
-    public class Statistics {
+    public class StatisticsCollection {
 
         @Schema(description = "Statistics per job")
         public final Collection<Job.Statistics> jobStatistics;
 
-        public Statistics(Collection<Job.Statistics> stats) {
+        public StatisticsCollection(Collection<Job.Statistics> stats) {
             this.jobStatistics = stats;
         }
-
     }
 
     @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE)
     @Operation(summary = "Returns statistics", description = "")
     @ApiResponses(value = { //
             @ApiResponse(responseCode = "200", description = "OK", //
-                    content = @Content(schema = @Schema(implementation = Statistics.class))) //
+                    content = @Content(schema = @Schema(implementation = StatisticsCollection.class))) //
     })
     public ResponseEntity<Object> getStatistics() {
         List<Job.Statistics> res = new ArrayList<>();
@@ -173,7 +172,7 @@ public class ProducerCallbacksController {
             res.add(job.getStatistics());
         }
 
-        return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK);
+        return new ResponseEntity<>(gson.toJson(new StatisticsCollection(res)), HttpStatus.OK);
     }
 
 }
index ca2c82b..0eefed3 100644 (file)
@@ -30,7 +30,7 @@ import lombok.Builder;
 public class PmReport {
 
     @Expose
-    Event event = new Event();
+    public Event event = new Event();
 
     public static class CommonEventHeader {
         @Expose
index 78fa6bb..e602c1c 100644 (file)
@@ -62,6 +62,9 @@ public class PmReportFilter implements Filter {
 
         @Setter
         String pmRopStartTime;
+
+        @Setter
+        String pmRopEndTime;
     }
 
     private static class MeasTypesIndexed extends PmReport.MeasTypes {
@@ -132,6 +135,7 @@ public class PmReportFilter implements Filter {
         reportFiltered.event.perf3gppFields = report.event.perf3gppFields.toBuilder().build();
         reportFiltered.event.perf3gppFields.measDataCollection =
                 report.event.perf3gppFields.measDataCollection.toBuilder().build();
+
         reportFiltered.event.perf3gppFields.measDataCollection.measInfoList = filteredMeasObjs;
         return !filteredMeasObjs.isEmpty();
     }
index b5b7e52..f57614f 100644 (file)
@@ -45,6 +45,7 @@ public class Job {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     @Builder
+    @Getter
     @Schema(name = "job_statistics", description = "Statistics information for one job")
     public static class Statistics {
 
index ef10d3a..71918db 100644 (file)
@@ -26,8 +26,6 @@ import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -38,8 +36,8 @@ import reactor.core.publisher.Mono;
 public class HttpJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux<TopicListener.DataFromTopic> input) {
-        super(job, config, input);
+    public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+        super(job, config);
     }
 
     @Override
index 4acd365..5571669 100644 (file)
@@ -56,7 +56,6 @@ public abstract class JobDataDistributor {
     private final Job job;
     private Disposable subscription;
     private final ErrorStats errorStats = new ErrorStats();
-    private final ApplicationConfig applConfig;
 
     private final DataStore dataStore;
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
@@ -87,20 +86,13 @@ public abstract class JobDataDistributor {
         }
     }
 
-    protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux<TopicListener.DataFromTopic> input) {
+    protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
         this.job = job;
-        this.applConfig = applConfig;
         this.dataStore = DataStore.create(applConfig);
         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
         this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
 
         this.errorStats.resetIrrecoverableErrors();
-        this.subscription = filterAndBuffer(input, this.job) //
-                .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
-                .onErrorResume(this::handleError) //
-                .subscribe(this::handleSentOk, //
-                        this::handleExceptionInStream, //
-                        () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
     }
 
     static class LockedException extends ServiceException {
@@ -109,9 +101,18 @@ public abstract class JobDataDistributor {
         }
     }
 
-    public void collectHistoricalData() {
+    public void start(Flux<TopicListener.DataFromTopic> input) {
         PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
 
+        if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
+            this.subscription = filterAndBuffer(input, this.job) //
+                    .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
+                    .onErrorResume(this::handleError) //
+                    .subscribe(this::handleSentOk, //
+                            this::handleExceptionInStream, //
+                            () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
+        }
+
         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
             this.dataStore.createLock(collectHistoricalDataLockName()) //
                     .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted)
@@ -123,7 +124,8 @@ public abstract class JobDataDistributor {
                     .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
                             this.job.getId())) //
                     .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
-                    .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) //
+                    .filter(this::isRopFile).filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
+                    .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
                     .map(this::createFakeEvent) //
                     .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
                             dataStore), 100)
@@ -176,26 +178,45 @@ public abstract class JobDataDistributor {
         return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes());
     }
 
-    private boolean filterStartTime(String startTimeStr, String fileName) {
+    private static String fileTimePartFromRopFileName(String fileName) {
+        return fileName.substring(fileName.lastIndexOf("/") + 2);
+    }
+
+    private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
         // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
         try {
-            if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) {
-
-                String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
-                fileTimePart = fileTimePart.substring(0, 18);
+            String fileTimePart = fileTimePartFromRopFileName(fileName);
+            fileTimePart = fileTimePart.substring(0, 18);
+            OffsetDateTime fileStartTime = parseFileDate(fileTimePart);
+            OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
+            boolean isMatch = fileStartTime.isAfter(startTime);
+            logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
+                    fileStartTime, startTime, isMatch);
+            return isMatch;
+        } catch (Exception e) {
+            logger.warn("Time parsing exception: {}", e.getMessage());
+            return false;
+        }
+    }
 
-                DateTimeFormatter formatter =
-                        new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+    private boolean isRopFile(String fileName) {
+        return fileName.endsWith(".json") || fileName.endsWith(".json.gz");
+    }
 
-                OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
-                OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
-                boolean isBefore = startTime.isBefore(fileStartTime);
-                logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isBefore: {}", fileName,
-                        fileStartTime, startTime, isBefore);
-                return isBefore;
-            } else {
-                return false;
-            }
+    private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+        if (filter.getPmRopEndTime() == null) {
+            return true;
+        }
+        try {
+            String fileTimePart = fileTimePartFromRopFileName(fileName);
+            fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
+            OffsetDateTime fileEndTime = parseFileDate(fileTimePart);
+            OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
+            boolean isMatch = fileEndTime.isBefore(endTime);
+            logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
+                    endTime, isMatch);
+            return isMatch;
 
         } catch (Exception e) {
             logger.warn("Time parsing exception: {}", e.getMessage());
@@ -203,6 +224,12 @@ public abstract class JobDataDistributor {
         }
     }
 
+    private static OffsetDateTime parseFileDate(String timeStr) {
+        DateTimeFormatter startTimeFormatter =
+                new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+        return OffsetDateTime.parse(timeStr, startTimeFormatter);
+    }
+
     private void handleExceptionInStream(Throwable t) {
         logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
         stop();
index 5e09714..5526fc8 100644 (file)
@@ -33,7 +33,6 @@ import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.kafka.sender.KafkaSender;
 import reactor.kafka.sender.SenderOptions;
@@ -48,11 +47,10 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
 
     private KafkaSender<byte[], byte[]> sender;
-    private final ApplicationConfig appConfig;
 
-    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux<TopicListener.DataFromTopic> input) {
-        super(job, appConfig, input);
-        this.appConfig = appConfig;
+    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
+        super(job, appConfig);
+
         SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
         this.sender = KafkaSender.create(senderOptions);
     }
index 00c107c..e63d934 100644 (file)
@@ -122,7 +122,7 @@ public class KafkaTopicListener implements TopicListener {
         }
     }
 
-    static byte[] unzip(byte[] bytes, String fileName) {
+    public static byte[] unzip(byte[] bytes, String fileName) {
         if (!fileName.endsWith(".gz")) {
             return bytes;
         }
index 4d76cd1..a3e3703 100644 (file)
@@ -93,18 +93,17 @@ public class TopicListeners {
         }
     }
 
-    private JobDataDistributor createConsumer(Job job, TopicListener topicListener) {
-        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic())
-                ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux())
-                : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux());
+    private JobDataDistributor createConsumer(Job job) {
+        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
+                : new HttpJobDataDistributor(job, appConfig);
     }
 
     private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
             Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
-        JobDataDistributor distributor = createConsumer(job, topicListener);
+        JobDataDistributor distributor = createConsumer(job);
 
-        distributor.collectHistoricalData();
+        distributor.start(topicListener.getFlux());
 
         distributors.put(job.getType().getId(), job.getId(), distributor);
     }
index 5b4ab89..c579fe2 100644 (file)
@@ -4,98 +4,68 @@
    "additionalProperties": false,
    "properties": {
       "filter": {
-         "anyOf": [
-            {
-               "type": "string"
+         "type": "object",
+         "additionalProperties": false,
+         "properties": {
+            "sourceNames": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measObjInstIds": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measObjClass": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
+            },
+            "measTypes": {
+               "type": "array",
+               "items": [
+                  {
+                     "type": "string"
+                  }
+               ]
             },
-            {
-               "type": "object",
-               "additionalProperties": false,
-               "properties": {
-                  "sourceNames": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measObjInstIds": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measObjClass": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measTypes": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "measuredEntityDns": {
-                     "type": "array",
-                     "items": [
-                        {
-                           "type": "string"
-                        }
-                     ]
-                  },
-                  "pmRopStartTime": {
+            "measuredEntityDns": {
+               "type": "array",
+               "items": [
+                  {
                      "type": "string"
                   }
-               }
+               ]
+            },
+            "pmRopStartTime": {
+               "type": "string"
+            },
+            "pmRopEndTime": {
+               "type": "string"
             }
-         ]
+         }
       },
       "filterType": {
          "type": "string",
          "enum": [
-            "jslt",
-            "regexp",
-            "pmdata",
-            "json-path"
+            "pmdata"
          ]
       },
-      "maxConcurrency": {
-         "type": "integer",
-         "minimum": 1
-      },
       "kafkaOutputTopic": {
          "type": "string"
       },
       "gzip": {
          "type": "boolean"
-      },
-      "bufferTimeout": {
-         "type": "object",
-         "additionalProperties": false,
-         "properties": {
-            "maxSize": {
-               "type": "integer",
-               "minimum": 1
-            },
-            "maxTimeMiliseconds": {
-               "type": "integer",
-               "minimum": 0,
-               "maximum": 160000
-            }
-         },
-         "required": [
-            "maxSize",
-            "maxTimeMiliseconds"
-         ]
       }
    }
 }
\ No newline at end of file
index 6b48f79..10779d0 100644 (file)
@@ -25,22 +25,16 @@ import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.gson.JsonParser;
-import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageOrBuilder;
-import com.google.protobuf.util.JsonFormat;
 
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.time.Instant;
-import java.util.ArrayList;
+import java.time.OffsetDateTime;
 import java.util.Map;
 
 import org.json.JSONObject;
@@ -93,7 +87,8 @@ import reactor.test.StepVerifier;
         "app.webclient.trust-store-used=true", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
         "app.pm-files-path=/tmp/dmaapadaptor", //
-        "app.s3.endpointOverride="})
+        "app.s3.endpointOverride=" //
+})
 class ApplicationTest {
 
     @Autowired
@@ -127,80 +122,6 @@ class ApplicationTest {
 
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    public static class ProtoJsonUtil {
-
-        /**
-         * Makes a Json from a given message or builder
-         *
-         * @param messageOrBuilder is the instance
-         * @return The string representation
-         * @throws IOException if any error occurs
-         */
-        public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException {
-            return JsonFormat.printer().print(messageOrBuilder);
-        }
-
-        /**
-         * Makes a new instance of message based on the json and the class
-         *
-         * @param <T> is the class type
-         * @param json is the json instance
-         * @param clazz is the class instance
-         * @return An instance of T based on the json values
-         * @throws IOException if any error occurs
-         */
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        public static <T extends Message> T fromJson(String json, Class<T> clazz) throws IOException {
-            // https://stackoverflow.com/questions/27642021/calling-parsefrom-method-for-generic-protobuffer-class-in-java/33701202#33701202
-            Builder builder = null;
-            try {
-                // Since we are dealing with a Message type, we can call newBuilder()
-                builder = (Builder) clazz.getMethod("newBuilder").invoke(null);
-
-            } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
-                    | NoSuchMethodException | SecurityException e) {
-                return null;
-            }
-
-            // The instance is placed into the builder values
-            JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
-
-            // the instance will be from the build
-            return (T) builder.build();
-        }
-    }
-
-    // @Test
-    void testProtoBuf() throws Exception {
-        String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json";
-
-        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-
-        PmProtoGenerated.PmRopFile proto = ProtoJsonUtil.fromJson(pmReportJson, PmProtoGenerated.PmRopFile.class);
-        byte[] bytes = proto.toByteArray();
-
-        int TIMES = 100000;
-        {
-            Instant startTime = Instant.now();
-            for (int i = 0; i < TIMES; ++i) {
-                PmProtoGenerated.PmRopFile.parseFrom(bytes);
-            }
-            long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-            logger.info("*** Duration PROTO :" + durationSeconds + ", objects/second: " + TIMES / durationSeconds
-                    + " time: " + (float) durationSeconds / TIMES);
-        }
-        {
-            Instant startTime = Instant.now();
-            for (int i = 0; i < TIMES; ++i) {
-                PmReport reportsParsed = gson.fromJson(pmReportJson, PmReport.class);
-            }
-            long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-            logger.info("*** Duration GSON :" + durationSeconds + ", objects/second: " + TIMES / durationSeconds
-                    + " time: " + (float) durationSeconds / TIMES);
-        }
-
-    }
-
     static class TestApplicationConfig extends ApplicationConfig {
 
         @Override
@@ -242,6 +163,7 @@ class ApplicationTest {
             TestApplicationConfig cfg = new TestApplicationConfig();
             return cfg;
         }
+
     }
 
     @BeforeEach
@@ -262,10 +184,12 @@ class ApplicationTest {
 
     @AfterEach
     void reset() {
+        DmaapSimulatorController.reset();
         for (Job job : this.jobs.getAll()) {
             this.icsSimulatorController.deleteJob(job.getId(), restClient());
         }
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        DmaapSimulatorController.reset();
 
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();
@@ -405,10 +329,10 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]");
+        DmaapSimulatorController.addResponse("[\"{}\", \"{}\"]");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[{}, {}]");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
 
         String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
@@ -436,7 +360,6 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
         assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11");
         assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22");
-        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         // Delete the job
         this.icsSimulatorController.deleteJob(JOB_ID, restClient());
@@ -449,9 +372,6 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4));
     }
 
-    static class PmReportArray extends ArrayList<PmReport> {
-    };
-
     @Test
     void testPmFiltering() throws Exception {
         // Create a job
@@ -467,8 +387,9 @@ class ApplicationTest {
         filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
-        Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
-                .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+        Job.Parameters param =
+                Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
+
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
 
@@ -493,8 +414,8 @@ class ApplicationTest {
         String receivedFiltered = consumer.receivedBodies.get(0);
         assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
 
-        PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
-        assertThat(reportsParsed).hasSize(1);
+        PmReport reportsParsed = gson.fromJson(receivedFiltered, PmReport.class);
+        assertThat(reportsParsed.event).isNotNull();
     }
 
     @Test
@@ -512,9 +433,10 @@ class ApplicationTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.getSourceNames().add("O-DU-1122");
         filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+        filterData.setPmRopEndTime(OffsetDateTime.now().toString());
 
-        Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
-                .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+        Job.Parameters param =
+                Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
 
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
@@ -540,7 +462,7 @@ class ApplicationTest {
         Job.Parameters param =
                 Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build();
         String paramJson = gson.toJson(param);
-        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson));
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(paramJson));
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -549,7 +471,7 @@ class ApplicationTest {
         // filtered PM message
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-        DmaapSimulatorController.addPmResponse(pmReportJson);
+        DmaapSimulatorController.addResponse(json2dmaapResp(pmReportJson));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
@@ -558,6 +480,10 @@ class ApplicationTest {
         assertThat(receivedFiltered).contains("event");
     }
 
+    private String json2dmaapResp(String json) {
+        return "[" + quote(json) + "]";
+    }
+
     @Test
     void testAuthToken() throws Exception {
 
@@ -591,10 +517,6 @@ class ApplicationTest {
         Map<String, String> headers = consumer.receivedHeaders.get(0);
         assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
 
-        // This is the only time it is verified that mime type is plaintext when isJson
-        // is false and buffering is not used
-        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
-
         Files.delete(authFile);
         this.securityContext.setAuthTokenFilePath(null);
     }
@@ -607,7 +529,7 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job with JsonPath Filtering
-        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, this.jsonObjectJsonPath());
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, this.jsonObjectJsonPath());
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -616,7 +538,7 @@ class ApplicationTest {
         // filtered PM message
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-        DmaapSimulatorController.addPmResponse(pmReportJson);
+        DmaapSimulatorController.addResponse(json2dmaapResp(pmReportJson));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
index a58625c..801ec3e 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.oran.dmaapadapter;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.media.Content;
 import io.swagger.v3.oas.annotations.media.Schema;
@@ -62,6 +64,13 @@ public class DmaapSimulatorController {
         dmaapResponses.add(response);
     }
 
+    public static void reset() {
+        assertThat(dmaapPmResponses).isEmpty();
+        assertThat(dmaapResponses).isEmpty();
+        dmaapPmResponses.clear();
+        dmaapResponses.clear();
+    }
+
     private static String quote(String str) {
         final String q = "\"";
         return q + str.replace(q, "\\\"") + q;
@@ -79,7 +88,7 @@ public class DmaapSimulatorController {
             return nothing();
         } else {
             String resp = dmaapResponses.remove(0);
-            logger.info("DMAAP simulator returned: {}", resp);
+            logger.trace("DMAAP simulator returned: {}", resp);
             return new ResponseEntity<>(resp, HttpStatus.OK);
         }
 
index e27e95a..93ecad8 100644 (file)
@@ -130,11 +130,12 @@ public class IcsSimulatorController {
         ProducerInfoTypeInfo type = testResults.types.get(job.infoTypeId);
         if (type == null) {
             logger.error("type not found: {} size: {}", job.infoTypeId, testResults.types.size());
+        } else {
+            assertThat(type).isNotNull();
+            validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema);
+            logger.debug("ICS Simulator PUT job: {}", body);
+            restClient.post(url, body, MediaType.APPLICATION_JSON).block();
         }
-        assertThat(type).isNotNull();
-        validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema);
-        logger.debug("ICS Simulator PUT job: {}", body);
-        restClient.post(url, body, MediaType.APPLICATION_JSON).block();
     }
 
     private void validateJsonObjectAgainstSchema(Object object, Object schemaObj) throws ServiceException {
index deff7ee..e2557cd 100644 (file)
@@ -30,6 +30,7 @@ import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
+import java.time.OffsetDateTime;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -48,6 +49,7 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController.StatisticsCollection;
 import org.oran.dmaapadapter.datastore.DataStore;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReportFilter;
@@ -55,6 +57,7 @@ import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Job.Statistics;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
 import org.oran.dmaapadapter.tasks.NewFileEvent;
@@ -86,7 +89,8 @@ import reactor.kafka.sender.SenderRecord;
         "app.pm-files-path=./src/test/resources/", //
         "app.s3.locksBucket=ropfilelocks", //
         "app.pm-files-path=/tmp/dmaapadaptor", //
-        "app.s3.bucket="}) //
+        "app.s3.bucket=" //
+}) //
 class IntegrationWithKafka {
 
     final String KAFKA_TYPE_ID = "KafkaInformationType";
@@ -183,11 +187,26 @@ class IntegrationWithKafka {
             KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
 
             topicListener.getFlux() //
+                    .map(this::unzip) //
                     .doOnNext(this::set) //
                     .doFinally(sig -> logger.info("Finally " + sig)) //
                     .subscribe();
         }
 
+        boolean isUnzip = false;
+
+        private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
+            if (!this.isUnzip) {
+                return receivedKafkaOutput;
+            }
+            byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value, "junk.gz");
+            return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key);
+        }
+
+        public void setUnzip(boolean unzip) {
+            this.isUnzip = unzip;
+        }
+
         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
             this.count++;
@@ -204,6 +223,7 @@ class IntegrationWithKafka {
 
         void reset() {
             this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null);
+            this.count = 0;
         }
     }
 
@@ -380,6 +400,27 @@ class IntegrationWithKafka {
         Thread.sleep(4000);
     }
 
+    private void printStatistics() {
+        String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+        String statsResp = restClient().get(targetUri).block();
+        StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
+        int noOfSentBytes = 0;
+        int noOfSentObjs = 0;
+        for (Statistics s : stats.jobStatistics) {
+            noOfSentBytes += s.getNoOfSentBytes();
+            noOfSentObjs += s.getNoOfSentObjects();
+        }
+        logger.error("  Stats noOfSentBytes: {}, noOfSentObjects: {}, noOfTopics: {}", noOfSentBytes, noOfSentObjs,
+                stats.jobStatistics.size());
+    }
+
+    private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
+        final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
+        logger.error("*** {} Duration ({} ms),  objects/second: {}", str, durationMs,
+                (noOfIterations * 1000) / durationMs);
+        printStatistics();
+    }
+
     @Test
     void simpleCase() throws Exception {
         final String JOB_ID = "ID";
@@ -445,12 +486,6 @@ class IntegrationWithKafka {
         printStatistics();
     }
 
-    private void printStatistics() {
-        String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
-        String stats = restClient().get(targetUri).block();
-        logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000));
-    }
-
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
     void kafkaCharacteristics() throws Exception {
@@ -477,8 +512,7 @@ class IntegrationWithKafka {
             Thread.sleep(1000 * 1);
         }
 
-        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+        printCharacteristicsResult("kafkaCharacteristics", startTime, NO_OF_OBJECTS);
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@@ -494,8 +528,8 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
-        filterData.getMeasObjClass().add("UtranCell");
+        filterData.getMeasTypes().add("pmCounterNumber0");
+        filterData.getMeasObjClass().add("NRCellCU");
 
         this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
                 restClient());
@@ -509,7 +543,7 @@ class IntegrationWithKafka {
 
         Instant startTime = Instant.now();
 
-        final String FILE_NAME = "pm_report.json.gz";
+        final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
 
         DataStore fileStore = dataStore();
 
@@ -525,22 +559,14 @@ class IntegrationWithKafka {
             Thread.sleep(1000 * 1);
         }
 
-        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+        printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
-
-        printStatistics();
-    }
-
-    @Test
-    void clear() {
-
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
     void kafkaCharacteristics_manyPmJobs() throws Exception {
-        // Filter PM reports and sent to two jobs over Kafka
+        // Filter PM reports and sent to many jobs over Kafka
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
@@ -551,20 +577,22 @@ class IntegrationWithKafka {
         filterData.getMeasTypes().add("pmCounterNumber1");
         filterData.getMeasObjClass().add("NRCellCU");
 
+        final boolean USE_GZIP = true;
         final int NO_OF_JOBS = 150;
         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
         for (int i = 0; i < NO_OF_JOBS; ++i) {
             final String outputTopic = "manyJobs_" + i;
-            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic,
+            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, USE_GZIP), outputTopic,
                     restClient());
             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
+            receiver.setUnzip(USE_GZIP);
             receivers.add(receiver);
         }
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 1000;
+        final int NO_OF_OBJECTS = 100;
 
         Instant startTime = Instant.now();
 
@@ -581,20 +609,19 @@ class IntegrationWithKafka {
         sendDataToKafka(dataToSend);
 
         while (receivers.get(0).count != NO_OF_OBJECTS) {
-            logger.info("sleeping {}", kafkaReceiver.count);
+            if (kafkaReceiver.count > 0) {
+                logger.info("sleeping {}", kafkaReceiver.count);
+            }
             Thread.sleep(1000 * 1);
         }
 
-        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+        printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
 
         for (KafkaReceiver receiver : receivers) {
             if (receiver.count != NO_OF_OBJECTS) {
                 System.out.println("** Unexpected" + receiver.OUTPUT_TOPIC + " " + receiver.count);
             }
         }
-
-        printStatistics();
     }
 
     private String newFileEvent(String fileName) {
@@ -630,6 +657,8 @@ class IntegrationWithKafka {
         filterData.getSourceNames().add("O-DU-1122");
         filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
 
+        filterData.setPmRopEndTime(OffsetDateTime.now().toString());
+
         this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
                 restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
index 20c03ca..3e8afc2 100644 (file)
@@ -24,20 +24,73 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.util.JsonFormat;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Instant;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.PmProtoGenerated;
 import org.oran.dmaapadapter.filter.Filter.FilteredData;
+import org.oran.dmaapadapter.tasks.KafkaTopicListener;
 import org.oran.dmaapadapter.tasks.TopicListener;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class PmReportFilterTest {
+
+    public static class ProtoJsonUtil {
+
+        /**
+         * Makes a Json from a given message or builder
+         *
+         * @param messageOrBuilder is the instance
+         * @return The string representation
+         * @throws IOException if any error occurs
+         */
+        public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException {
+            return JsonFormat.printer().print(messageOrBuilder);
+        }
+
+        /**
+         * Makes a new instance of message based on the json and the class
+         *
+         * @param <T> is the class type
+         * @param json is the json instance
+         * @param clazz is the class instance
+         * @return An instance of T based on the json values
+         * @throws IOException if any error occurs
+         */
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        public static <T extends Message> T fromJson(String json, Class<T> clazz) throws IOException {
+            // https://stackoverflow.com/questions/27642021/calling-parsefrom-method-for-generic-protobuffer-class-in-java/33701202#33701202
+            Builder builder = null;
+            try {
+                // Since we are dealing with a Message type, we can call newBuilder()
+                builder = (Builder) clazz.getMethod("newBuilder").invoke(null);
+
+            } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+                    | NoSuchMethodException | SecurityException e) {
+                return null;
+            }
+
+            // The instance is placed into the builder values
+            JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
+
+            // the instance will be from the build
+            return (T) builder.build();
+        }
+    }
+
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private String filterReport(PmReportFilter filter) throws Exception {
@@ -121,40 +174,69 @@ class PmReportFilterTest {
         assertThat(filtered).contains("O-DU-1122");
     }
 
-    void testCharacteristics() throws Exception {
+    // @Test
+    void testSomeCharacteristics() throws Exception {
         Gson gson = new GsonBuilder() //
                 .disableHtmlEscaping() //
                 .create(); //
 
         String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
-        String report = Files.readString(Path.of(path), Charset.defaultCharset());
 
-        TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, report.getBytes());
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
 
-        Instant startTime = Instant.now();
+        PmProtoGenerated.PmRopFile proto = ProtoJsonUtil.fromJson(pmReportJson, PmProtoGenerated.PmRopFile.class);
+        byte[] bytes = proto.toByteArray();
 
-        int CNT = 100000;
-        for (int i = 0; i < CNT; ++i) {
-            gson.fromJson(data.valueAsString(), PmReport.class);
+        int TIMES = 100000;
+
+        {
+            path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
+            byte[] pmReportZipped = Files.readAllBytes(Path.of(path));
+
+            Instant startTime = Instant.now();
+            for (int i = 0; i < TIMES; ++i) {
+                KafkaTopicListener.unzip(pmReportZipped, "junk.gz");
+            }
+
+            printDuration("Unzip", startTime, TIMES);
         }
-        printDuration("Parse", startTime, CNT);
+        {
 
-        startTime = Instant.now();
+            PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+            filterData.getMeasTypes().add("pmCounterNumber0");
+            filterData.getMeasObjClass().add("NRCellCU");
+            PmReportFilter filter = new PmReportFilter(filterData);
+            DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes());
 
-        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.measTypes.add("pmCounterNumber0");
-        PmReportFilter filter = new PmReportFilter(filterData);
-        for (int i = 0; i < CNT; ++i) {
-            FilteredData filtered = filter.filter(data);
+            Instant startTime = Instant.now();
+            for (int i = 0; i < TIMES; ++i) {
+                filter.filter(topicData);
+            }
+            printDuration("PM Filter", startTime, TIMES);
+        }
+
+        {
+            Instant startTime = Instant.now();
+            for (int i = 0; i < TIMES; ++i) {
+                PmProtoGenerated.PmRopFile.parseFrom(bytes);
+            }
+
+            printDuration("Protobuf parsing", startTime, TIMES);
+        }
+        {
+            Instant startTime = Instant.now();
+            for (int i = 0; i < TIMES; ++i) {
+                gson.fromJson(pmReportJson, PmReport.class);
+            }
+            printDuration("Json parsing", startTime, TIMES);
         }
 
-        printDuration("Filter", startTime, CNT);
     }
 
     void printDuration(String str, Instant startTime, int noOfIterations) {
-        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-        logger.info("*** Duration " + str + " :" + durationSeconds + ", objects/second: "
-                + noOfIterations / durationSeconds);
+        final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
+        logger.info("*** Duration (ms) " + str + " :" + durationMs + ", objects/second: "
+                + (noOfIterations * 1000) / durationMs);
     }
 
     @Test
index 64ef1c5..9c51844 100644 (file)
@@ -4,7 +4,7 @@
          "id": "DmaapInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
          "useHttpProxy": false,
-         "isJson": false
+         "isJson": true
       },
       {
          "id": "KafkaInformationType",