From a320b4093cc0f7076e0599dcad8f9fd33f7b44f6 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 26 Oct 2022 14:08:02 +0200 Subject: [PATCH] Minor changes Added pmRopEndTime in the PM filtering. Simplified the PM job schema. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Idd5d501e65643cf37b101f9e4e64d446b2d9940f --- .../controllers/ProducerCallbacksController.java | 9 +- .../org/oran/dmaapadapter/filter/PmReport.java | 2 +- .../oran/dmaapadapter/filter/PmReportFilter.java | 4 + .../java/org/oran/dmaapadapter/repository/Job.java | 1 + .../dmaapadapter/tasks/HttpJobDataDistributor.java | 6 +- .../dmaapadapter/tasks/JobDataDistributor.java | 81 ++++++++----- .../tasks/KafkaJobDataDistributor.java | 8 +- .../dmaapadapter/tasks/KafkaTopicListener.java | 2 +- .../oran/dmaapadapter/tasks/TopicListeners.java | 11 +- src/main/resources/typeSchemaPmData.json | 124 ++++++++------------ .../org/oran/dmaapadapter/ApplicationTest.java | 126 ++++----------------- .../dmaapadapter/DmaapSimulatorController.java | 11 +- .../oran/dmaapadapter/IcsSimulatorController.java | 9 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 87 +++++++++----- .../dmaapadapter/filter/PmReportFilterTest.java | 118 ++++++++++++++++--- .../resources/test_application_configuration.json | 2 +- 16 files changed, 320 insertions(+), 281 deletions(-) diff --git a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index 4967626..1816dc3 100644 --- a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -150,22 +150,21 @@ public class ProducerCallbacksController { } @Schema(name = "statistics_info", description = "Statistics information") - public class Statistics { + public class StatisticsCollection { @Schema(description = "Statistics per job") public final Collection jobStatistics; - public Statistics(Collection stats) { + public StatisticsCollection(Collection stats) { this.jobStatistics = stats; } - } @GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Returns statistics", description = "") @ApiResponses(value = { // @ApiResponse(responseCode = "200", description = "OK", // - content = @Content(schema = @Schema(implementation = Statistics.class))) // + content = @Content(schema = @Schema(implementation = StatisticsCollection.class))) // }) public ResponseEntity getStatistics() { List res = new ArrayList<>(); @@ -173,7 +172,7 @@ public class ProducerCallbacksController { res.add(job.getStatistics()); } - return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK); + return new ResponseEntity<>(gson.toJson(new StatisticsCollection(res)), HttpStatus.OK); } } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java index ca2c82b..0eefed3 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java @@ -30,7 +30,7 @@ import lombok.Builder; public class PmReport { @Expose - Event event = new Event(); + public Event event = new Event(); public static class CommonEventHeader { @Expose diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index 78fa6bb..e602c1c 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -62,6 +62,9 @@ public class PmReportFilter implements Filter { @Setter String pmRopStartTime; + + @Setter + String pmRopEndTime; } private static class MeasTypesIndexed extends PmReport.MeasTypes { @@ -132,6 +135,7 @@ public class PmReportFilter implements Filter { reportFiltered.event.perf3gppFields = report.event.perf3gppFields.toBuilder().build(); reportFiltered.event.perf3gppFields.measDataCollection = report.event.perf3gppFields.measDataCollection.toBuilder().build(); + reportFiltered.event.perf3gppFields.measDataCollection.measInfoList = filteredMeasObjs; return !filteredMeasObjs.isEmpty(); } diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index b5b7e52..f57614f 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -45,6 +45,7 @@ public class Job { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Builder + @Getter @Schema(name = "job_statistics", description = "Statistics information for one job") public static class Statistics { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index ef10d3a..71918db 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -26,8 +26,6 @@ import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -38,8 +36,8 @@ import reactor.core.publisher.Mono; public class HttpJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux input) { - super(job, config, input); + public HttpJobDataDistributor(Job job, ApplicationConfig config) { + super(job, config); } @Override diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 4acd365..5571669 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -56,7 +56,6 @@ public abstract class JobDataDistributor { private final Job job; private Disposable subscription; private final ErrorStats errorStats = new ErrorStats(); - private final ApplicationConfig applConfig; private final DataStore dataStore; private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); @@ -87,20 +86,13 @@ public abstract class JobDataDistributor { } } - protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux input) { + protected JobDataDistributor(Job job, ApplicationConfig applConfig) { this.job = job; - this.applConfig = applConfig; this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); this.dataStore.create(DataStore.Bucket.LOCKS).subscribe(); this.errorStats.resetIrrecoverableErrors(); - this.subscription = filterAndBuffer(input, this.job) // - .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // - .onErrorResume(this::handleError) // - .subscribe(this::handleSentOk, // - this::handleExceptionInStream, // - () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId())); } static class LockedException extends ServiceException { @@ -109,9 +101,18 @@ public abstract class JobDataDistributor { } } - public void collectHistoricalData() { + public void start(Flux input) { PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; + if (filter == null || filter.getFilterData().getPmRopEndTime() == null) { + this.subscription = filterAndBuffer(input, this.job) // + .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // + .onErrorResume(this::handleError) // + .subscribe(this::handleSentOk, // + this::handleExceptionInStream, // + () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId())); + } + if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { this.dataStore.createLock(collectHistoricalDataLockName()) // .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted) @@ -123,7 +124,8 @@ public abstract class JobDataDistributor { .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName, this.job.getId())) // .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) // - .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) // + .filter(this::isRopFile).filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) // + .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) // .map(this::createFakeEvent) // .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(), dataStore), 100) @@ -176,26 +178,45 @@ public abstract class JobDataDistributor { return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes()); } - private boolean filterStartTime(String startTimeStr, String fileName) { + private static String fileTimePartFromRopFileName(String fileName) { + return fileName.substring(fileName.lastIndexOf("/") + 2); + } + + private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) { // A20000626.2315+0200-2330+0200_HTTPS-6-73.json try { - if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) { - - String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2); - fileTimePart = fileTimePart.substring(0, 18); + String fileTimePart = fileTimePartFromRopFileName(fileName); + fileTimePart = fileTimePart.substring(0, 18); + OffsetDateTime fileStartTime = parseFileDate(fileTimePart); + OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime()); + boolean isMatch = fileStartTime.isAfter(startTime); + logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName, + fileStartTime, startTime, isMatch); + return isMatch; + } catch (Exception e) { + logger.warn("Time parsing exception: {}", e.getMessage()); + return false; + } + } - DateTimeFormatter formatter = - new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); + private boolean isRopFile(String fileName) { + return fileName.endsWith(".json") || fileName.endsWith(".json.gz"); + } - OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter); - OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr); - boolean isBefore = startTime.isBefore(fileStartTime); - logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isBefore: {}", fileName, - fileStartTime, startTime, isBefore); - return isBefore; - } else { - return false; - } + private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) { + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json + if (filter.getPmRopEndTime() == null) { + return true; + } + try { + String fileTimePart = fileTimePartFromRopFileName(fileName); + fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28); + OffsetDateTime fileEndTime = parseFileDate(fileTimePart); + OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime()); + boolean isMatch = fileEndTime.isBefore(endTime); + logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime, + endTime, isMatch); + return isMatch; } catch (Exception e) { logger.warn("Time parsing exception: {}", e.getMessage()); @@ -203,6 +224,12 @@ public abstract class JobDataDistributor { } } + private static OffsetDateTime parseFileDate(String timeStr) { + DateTimeFormatter startTimeFormatter = + new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter(); + return OffsetDateTime.parse(timeStr, startTimeFormatter); + } + private void handleExceptionInStream(Throwable t) { logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId()); stop(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 5e09714..5526fc8 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -33,7 +33,6 @@ import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; @@ -48,11 +47,10 @@ public class KafkaJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class); private KafkaSender sender; - private final ApplicationConfig appConfig; - public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux input) { - super(job, appConfig, input); - this.appConfig = appConfig; + public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { + super(job, appConfig); + SenderOptions senderOptions = senderOptions(appConfig); this.sender = KafkaSender.create(senderOptions); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 00c107c..e63d934 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -122,7 +122,7 @@ public class KafkaTopicListener implements TopicListener { } } - static byte[] unzip(byte[] bytes, String fileName) { + public static byte[] unzip(byte[] bytes, String fileName) { if (!fileName.endsWith(".gz")) { return bytes; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 4d76cd1..a3e3703 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -93,18 +93,17 @@ public class TopicListeners { } } - private JobDataDistributor createConsumer(Job job, TopicListener topicListener) { - return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) - ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux()) - : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux()); + private JobDataDistributor createConsumer(Job job) { + return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) + : new HttpJobDataDistributor(job, appConfig); } private void addConsumer(Job job, MultiMap distributors, Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); - JobDataDistributor distributor = createConsumer(job, topicListener); + JobDataDistributor distributor = createConsumer(job); - distributor.collectHistoricalData(); + distributor.start(topicListener.getFlux()); distributors.put(job.getType().getId(), job.getId(), distributor); } diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index 5b4ab89..c579fe2 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -4,98 +4,68 @@ "additionalProperties": false, "properties": { "filter": { - "anyOf": [ - { - "type": "string" + "type": "object", + "additionalProperties": false, + "properties": { + "sourceNames": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measObjInstIds": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measObjClass": { + "type": "array", + "items": [ + { + "type": "string" + } + ] + }, + "measTypes": { + "type": "array", + "items": [ + { + "type": "string" + } + ] }, - { - "type": "object", - "additionalProperties": false, - "properties": { - "sourceNames": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measObjInstIds": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measObjClass": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measTypes": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "measuredEntityDns": { - "type": "array", - "items": [ - { - "type": "string" - } - ] - }, - "pmRopStartTime": { + "measuredEntityDns": { + "type": "array", + "items": [ + { "type": "string" } - } + ] + }, + "pmRopStartTime": { + "type": "string" + }, + "pmRopEndTime": { + "type": "string" } - ] + } }, "filterType": { "type": "string", "enum": [ - "jslt", - "regexp", - "pmdata", - "json-path" + "pmdata" ] }, - "maxConcurrency": { - "type": "integer", - "minimum": 1 - }, "kafkaOutputTopic": { "type": "string" }, "gzip": { "type": "boolean" - }, - "bufferTimeout": { - "type": "object", - "additionalProperties": false, - "properties": { - "maxSize": { - "type": "integer", - "minimum": 1 - }, - "maxTimeMiliseconds": { - "type": "integer", - "minimum": 0, - "maximum": 160000 - } - }, - "required": [ - "maxSize", - "maxTimeMiliseconds" - ] } } } \ No newline at end of file diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 6b48f79..10779d0 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -25,22 +25,16 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.gson.JsonParser; -import com.google.protobuf.AbstractMessage.Builder; -import com.google.protobuf.Message; -import com.google.protobuf.MessageOrBuilder; -import com.google.protobuf.util.JsonFormat; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.lang.reflect.InvocationTargetException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Instant; -import java.util.ArrayList; +import java.time.OffsetDateTime; import java.util.Map; import org.json.JSONObject; @@ -93,7 +87,8 @@ import reactor.test.StepVerifier; "app.webclient.trust-store-used=true", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // "app.pm-files-path=/tmp/dmaapadaptor", // - "app.s3.endpointOverride="}) + "app.s3.endpointOverride=" // +}) class ApplicationTest { @Autowired @@ -127,80 +122,6 @@ class ApplicationTest { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static class ProtoJsonUtil { - - /** - * Makes a Json from a given message or builder - * - * @param messageOrBuilder is the instance - * @return The string representation - * @throws IOException if any error occurs - */ - public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException { - return JsonFormat.printer().print(messageOrBuilder); - } - - /** - * Makes a new instance of message based on the json and the class - * - * @param is the class type - * @param json is the json instance - * @param clazz is the class instance - * @return An instance of T based on the json values - * @throws IOException if any error occurs - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public static T fromJson(String json, Class clazz) throws IOException { - // https://stackoverflow.com/questions/27642021/calling-parsefrom-method-for-generic-protobuffer-class-in-java/33701202#33701202 - Builder builder = null; - try { - // Since we are dealing with a Message type, we can call newBuilder() - builder = (Builder) clazz.getMethod("newBuilder").invoke(null); - - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException - | NoSuchMethodException | SecurityException e) { - return null; - } - - // The instance is placed into the builder values - JsonFormat.parser().ignoringUnknownFields().merge(json, builder); - - // the instance will be from the build - return (T) builder.build(); - } - } - - // @Test - void testProtoBuf() throws Exception { - String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json"; - - String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - - PmProtoGenerated.PmRopFile proto = ProtoJsonUtil.fromJson(pmReportJson, PmProtoGenerated.PmRopFile.class); - byte[] bytes = proto.toByteArray(); - - int TIMES = 100000; - { - Instant startTime = Instant.now(); - for (int i = 0; i < TIMES; ++i) { - PmProtoGenerated.PmRopFile.parseFrom(bytes); - } - long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration PROTO :" + durationSeconds + ", objects/second: " + TIMES / durationSeconds - + " time: " + (float) durationSeconds / TIMES); - } - { - Instant startTime = Instant.now(); - for (int i = 0; i < TIMES; ++i) { - PmReport reportsParsed = gson.fromJson(pmReportJson, PmReport.class); - } - long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration GSON :" + durationSeconds + ", objects/second: " + TIMES / durationSeconds - + " time: " + (float) durationSeconds / TIMES); - } - - } - static class TestApplicationConfig extends ApplicationConfig { @Override @@ -242,6 +163,7 @@ class ApplicationTest { TestApplicationConfig cfg = new TestApplicationConfig(); return cfg; } + } @BeforeEach @@ -262,10 +184,12 @@ class ApplicationTest { @AfterEach void reset() { + DmaapSimulatorController.reset(); for (Job job : this.jobs.getAll()) { this.icsSimulatorController.deleteJob(job.getId(), restClient()); } await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + DmaapSimulatorController.reset(); this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset(); @@ -405,10 +329,10 @@ class ApplicationTest { // Return two messages from DMAAP and verify that these are sent to the owner of // the job (consumer) - DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]"); + DmaapSimulatorController.addResponse("[\"{}\", \"{}\"]"); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]"); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[{}, {}]"); assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL; @@ -436,7 +360,6 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2)); assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11"); assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22"); - assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); // Delete the job this.icsSimulatorController.deleteJob(JOB_ID, restClient()); @@ -449,9 +372,6 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4)); } - static class PmReportArray extends ArrayList { - }; - @Test void testPmFiltering() throws Exception { // Create a job @@ -467,8 +387,9 @@ class ApplicationTest { filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); - Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) - .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); + Job.Parameters param = + Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); + String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); @@ -493,8 +414,8 @@ class ApplicationTest { String receivedFiltered = consumer.receivedBodies.get(0); assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1"); - PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class); - assertThat(reportsParsed).hasSize(1); + PmReport reportsParsed = gson.fromJson(receivedFiltered, PmReport.class); + assertThat(reportsParsed.event).isNotNull(); } @Test @@ -512,9 +433,10 @@ class ApplicationTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getSourceNames().add("O-DU-1122"); filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00"); + filterData.setPmRopEndTime(OffsetDateTime.now().toString()); - Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) - .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); + Job.Parameters param = + Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); @@ -540,7 +462,7 @@ class ApplicationTest { Job.Parameters param = Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build(); String paramJson = gson.toJson(param); - ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson)); + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(paramJson)); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -549,7 +471,7 @@ class ApplicationTest { // filtered PM message String path = "./src/test/resources/pm_report.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - DmaapSimulatorController.addPmResponse(pmReportJson); + DmaapSimulatorController.addResponse(json2dmaapResp(pmReportJson)); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); @@ -558,6 +480,10 @@ class ApplicationTest { assertThat(receivedFiltered).contains("event"); } + private String json2dmaapResp(String json) { + return "[" + quote(json) + "]"; + } + @Test void testAuthToken() throws Exception { @@ -591,10 +517,6 @@ class ApplicationTest { Map headers = consumer.receivedHeaders.get(0); assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN); - // This is the only time it is verified that mime type is plaintext when isJson - // is false and buffering is not used - assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); - Files.delete(authFile); this.securityContext.setAuthTokenFilePath(null); } @@ -607,7 +529,7 @@ class ApplicationTest { waitForRegistration(); // Create a job with JsonPath Filtering - ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, this.jsonObjectJsonPath()); + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, this.jsonObjectJsonPath()); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -616,7 +538,7 @@ class ApplicationTest { // filtered PM message String path = "./src/test/resources/pm_report.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - DmaapSimulatorController.addPmResponse(pmReportJson); + DmaapSimulatorController.addResponse(json2dmaapResp(pmReportJson)); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); diff --git a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java index a58625c..801ec3e 100644 --- a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -20,6 +20,8 @@ package org.oran.dmaapadapter; +import static org.assertj.core.api.Assertions.assertThat; + import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; @@ -62,6 +64,13 @@ public class DmaapSimulatorController { dmaapResponses.add(response); } + public static void reset() { + assertThat(dmaapPmResponses).isEmpty(); + assertThat(dmaapResponses).isEmpty(); + dmaapPmResponses.clear(); + dmaapResponses.clear(); + } + private static String quote(String str) { final String q = "\""; return q + str.replace(q, "\\\"") + q; @@ -79,7 +88,7 @@ public class DmaapSimulatorController { return nothing(); } else { String resp = dmaapResponses.remove(0); - logger.info("DMAAP simulator returned: {}", resp); + logger.trace("DMAAP simulator returned: {}", resp); return new ResponseEntity<>(resp, HttpStatus.OK); } diff --git a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java index e27e95a..93ecad8 100644 --- a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java @@ -130,11 +130,12 @@ public class IcsSimulatorController { ProducerInfoTypeInfo type = testResults.types.get(job.infoTypeId); if (type == null) { logger.error("type not found: {} size: {}", job.infoTypeId, testResults.types.size()); + } else { + assertThat(type).isNotNull(); + validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema); + logger.debug("ICS Simulator PUT job: {}", body); + restClient.post(url, body, MediaType.APPLICATION_JSON).block(); } - assertThat(type).isNotNull(); - validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema); - logger.debug("ICS Simulator PUT job: {}", body); - restClient.post(url, body, MediaType.APPLICATION_JSON).block(); } private void validateJsonObjectAgainstSchema(Object object, Object schemaObj) throws ServiceException { diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index deff7ee..e2557cd 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -30,6 +30,7 @@ import java.lang.invoke.MethodHandles; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -48,6 +49,7 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.controllers.ProducerCallbacksController.StatisticsCollection; import org.oran.dmaapadapter.datastore.DataStore; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReportFilter; @@ -55,6 +57,7 @@ import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Job.Statistics; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.KafkaTopicListener; import org.oran.dmaapadapter.tasks.NewFileEvent; @@ -86,7 +89,8 @@ import reactor.kafka.sender.SenderRecord; "app.pm-files-path=./src/test/resources/", // "app.s3.locksBucket=ropfilelocks", // "app.pm-files-path=/tmp/dmaapadaptor", // - "app.s3.bucket="}) // + "app.s3.bucket=" // +}) // class IntegrationWithKafka { final String KAFKA_TYPE_ID = "KafkaInformationType"; @@ -183,11 +187,26 @@ class IntegrationWithKafka { KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type); topicListener.getFlux() // + .map(this::unzip) // .doOnNext(this::set) // .doFinally(sig -> logger.info("Finally " + sig)) // .subscribe(); } + boolean isUnzip = false; + + private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) { + if (!this.isUnzip) { + return receivedKafkaOutput; + } + byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value, "junk.gz"); + return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key); + } + + public void setUnzip(boolean unzip) { + this.isUnzip = unzip; + } + private void set(TopicListener.DataFromTopic receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; this.count++; @@ -204,6 +223,7 @@ class IntegrationWithKafka { void reset() { this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null); + this.count = 0; } } @@ -380,6 +400,27 @@ class IntegrationWithKafka { Thread.sleep(4000); } + private void printStatistics() { + String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; + String statsResp = restClient().get(targetUri).block(); + StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class); + int noOfSentBytes = 0; + int noOfSentObjs = 0; + for (Statistics s : stats.jobStatistics) { + noOfSentBytes += s.getNoOfSentBytes(); + noOfSentObjs += s.getNoOfSentObjects(); + } + logger.error(" Stats noOfSentBytes: {}, noOfSentObjects: {}, noOfTopics: {}", noOfSentBytes, noOfSentObjs, + stats.jobStatistics.size()); + } + + private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) { + final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli(); + logger.error("*** {} Duration ({} ms), objects/second: {}", str, durationMs, + (noOfIterations * 1000) / durationMs); + printStatistics(); + } + @Test void simpleCase() throws Exception { final String JOB_ID = "ID"; @@ -445,12 +486,6 @@ class IntegrationWithKafka { printStatistics(); } - private void printStatistics() { - String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; - String stats = restClient().get(targetUri).block(); - logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000)); - } - @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test void kafkaCharacteristics() throws Exception { @@ -477,8 +512,7 @@ class IntegrationWithKafka { Thread.sleep(1000 * 1); } - final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); + printCharacteristicsResult("kafkaCharacteristics", startTime, NO_OF_OBJECTS); } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @@ -494,8 +528,8 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); - filterData.getMeasObjClass().add("UtranCell"); + filterData.getMeasTypes().add("pmCounterNumber0"); + filterData.getMeasObjClass().add("NRCellCU"); this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient()); @@ -509,7 +543,7 @@ class IntegrationWithKafka { Instant startTime = Instant.now(); - final String FILE_NAME = "pm_report.json.gz"; + final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json"; DataStore fileStore = dataStore(); @@ -525,22 +559,14 @@ class IntegrationWithKafka { Thread.sleep(1000 * 1); } - final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); + printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS); logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); - - printStatistics(); - } - - @Test - void clear() { - } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test void kafkaCharacteristics_manyPmJobs() throws Exception { - // Filter PM reports and sent to two jobs over Kafka + // Filter PM reports and sent to many jobs over Kafka // Register producer, Register types await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); @@ -551,20 +577,22 @@ class IntegrationWithKafka { filterData.getMeasTypes().add("pmCounterNumber1"); filterData.getMeasObjClass().add("NRCellCU"); + final boolean USE_GZIP = true; final int NO_OF_JOBS = 150; ArrayList receivers = new ArrayList<>(); for (int i = 0; i < NO_OF_JOBS; ++i) { final String outputTopic = "manyJobs_" + i; - this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic, + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, USE_GZIP), outputTopic, restClient()); KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext); + receiver.setUnzip(USE_GZIP); receivers.add(receiver); } await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 1000; + final int NO_OF_OBJECTS = 100; Instant startTime = Instant.now(); @@ -581,20 +609,19 @@ class IntegrationWithKafka { sendDataToKafka(dataToSend); while (receivers.get(0).count != NO_OF_OBJECTS) { - logger.info("sleeping {}", kafkaReceiver.count); + if (kafkaReceiver.count > 0) { + logger.info("sleeping {}", kafkaReceiver.count); + } Thread.sleep(1000 * 1); } - final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); + printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS); for (KafkaReceiver receiver : receivers) { if (receiver.count != NO_OF_OBJECTS) { System.out.println("** Unexpected" + receiver.OUTPUT_TOPIC + " " + receiver.count); } } - - printStatistics(); } private String newFileEvent(String fileName) { @@ -630,6 +657,8 @@ class IntegrationWithKafka { filterData.getSourceNames().add("O-DU-1122"); filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00"); + filterData.setPmRopEndTime(OffsetDateTime.now().toString()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index 20c03ca..3e8afc2 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -24,20 +24,73 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.protobuf.AbstractMessage.Builder; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.PmProtoGenerated; import org.oran.dmaapadapter.filter.Filter.FilteredData; +import org.oran.dmaapadapter.tasks.KafkaTopicListener; import org.oran.dmaapadapter.tasks.TopicListener; +import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class PmReportFilterTest { + + public static class ProtoJsonUtil { + + /** + * Makes a Json from a given message or builder + * + * @param messageOrBuilder is the instance + * @return The string representation + * @throws IOException if any error occurs + */ + public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException { + return JsonFormat.printer().print(messageOrBuilder); + } + + /** + * Makes a new instance of message based on the json and the class + * + * @param is the class type + * @param json is the json instance + * @param clazz is the class instance + * @return An instance of T based on the json values + * @throws IOException if any error occurs + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static T fromJson(String json, Class clazz) throws IOException { + // https://stackoverflow.com/questions/27642021/calling-parsefrom-method-for-generic-protobuffer-class-in-java/33701202#33701202 + Builder builder = null; + try { + // Since we are dealing with a Message type, we can call newBuilder() + builder = (Builder) clazz.getMethod("newBuilder").invoke(null); + + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException + | NoSuchMethodException | SecurityException e) { + return null; + } + + // The instance is placed into the builder values + JsonFormat.parser().ignoringUnknownFields().merge(json, builder); + + // the instance will be from the build + return (T) builder.build(); + } + } + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String filterReport(PmReportFilter filter) throws Exception { @@ -121,40 +174,69 @@ class PmReportFilterTest { assertThat(filtered).contains("O-DU-1122"); } - void testCharacteristics() throws Exception { + // @Test + void testSomeCharacteristics() throws Exception { Gson gson = new GsonBuilder() // .disableHtmlEscaping() // .create(); // String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"; - String report = Files.readString(Path.of(path), Charset.defaultCharset()); - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, report.getBytes()); + String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - Instant startTime = Instant.now(); + PmProtoGenerated.PmRopFile proto = ProtoJsonUtil.fromJson(pmReportJson, PmProtoGenerated.PmRopFile.class); + byte[] bytes = proto.toByteArray(); - int CNT = 100000; - for (int i = 0; i < CNT; ++i) { - gson.fromJson(data.valueAsString(), PmReport.class); + int TIMES = 100000; + + { + path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz"; + byte[] pmReportZipped = Files.readAllBytes(Path.of(path)); + + Instant startTime = Instant.now(); + for (int i = 0; i < TIMES; ++i) { + KafkaTopicListener.unzip(pmReportZipped, "junk.gz"); + } + + printDuration("Unzip", startTime, TIMES); } - printDuration("Parse", startTime, CNT); + { - startTime = Instant.now(); + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("pmCounterNumber0"); + filterData.getMeasObjClass().add("NRCellCU"); + PmReportFilter filter = new PmReportFilter(filterData); + DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes()); - PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.measTypes.add("pmCounterNumber0"); - PmReportFilter filter = new PmReportFilter(filterData); - for (int i = 0; i < CNT; ++i) { - FilteredData filtered = filter.filter(data); + Instant startTime = Instant.now(); + for (int i = 0; i < TIMES; ++i) { + filter.filter(topicData); + } + printDuration("PM Filter", startTime, TIMES); + } + + { + Instant startTime = Instant.now(); + for (int i = 0; i < TIMES; ++i) { + PmProtoGenerated.PmRopFile.parseFrom(bytes); + } + + printDuration("Protobuf parsing", startTime, TIMES); + } + { + Instant startTime = Instant.now(); + for (int i = 0; i < TIMES; ++i) { + gson.fromJson(pmReportJson, PmReport.class); + } + printDuration("Json parsing", startTime, TIMES); } - printDuration("Filter", startTime, CNT); } void printDuration(String str, Instant startTime, int noOfIterations) { - final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration " + str + " :" + durationSeconds + ", objects/second: " - + noOfIterations / durationSeconds); + final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli(); + logger.info("*** Duration (ms) " + str + " :" + durationMs + ", objects/second: " + + (noOfIterations * 1000) / durationMs); } @Test diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index 64ef1c5..9c51844 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -4,7 +4,7 @@ "id": "DmaapInformationType", "dmaapTopicUrl": "/dmaap-topic-1", "useHttpProxy": false, - "isJson": false + "isJson": true }, { "id": "KafkaInformationType", -- 2.16.6