From 03aca86a9766e1283b3bd3a09c3c602a684565b1 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 2 Nov 2022 14:03:44 +0100 Subject: [PATCH] Changed priniples for zipping output The descition of zipping the ouput is taken by the producer instead of the consumer. The consumer will get a Kafkaheader named "gzip" if the output is gzipped. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I3016313bbb8a833c14c651492da7a87c37e71e31 --- config/application.yaml | 1 + .../configuration/ApplicationConfig.java | 7 +- .../java/org/oran/dmaapadapter/filter/Filter.java | 23 +++++++ .../java/org/oran/dmaapadapter/repository/Job.java | 13 ---- .../dmaapadapter/tasks/DmaapTopicListener.java | 2 +- .../dmaapadapter/tasks/JobDataDistributor.java | 15 +++-- .../tasks/KafkaJobDataDistributor.java | 3 +- .../dmaapadapter/tasks/KafkaTopicListener.java | 27 ++++---- .../org/oran/dmaapadapter/tasks/TopicListener.java | 19 +++++- src/main/resources/typeSchemaPmData.json | 3 - .../oran/dmaapadapter/IntegrationWithKafka.java | 74 +++++++++++----------- .../oran/dmaapadapter/filter/JsltFilterTest.java | 2 +- .../dmaapadapter/filter/JsonPathFilterTest.java | 2 +- .../dmaapadapter/filter/PmReportFilterTest.java | 12 ++-- 14 files changed, 119 insertions(+), 84 deletions(-) diff --git a/config/application.yaml b/config/application.yaml index 8bdf414..8975849 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -84,6 +84,7 @@ app: # If the file name is empty, no authorization token is used auth-token-file: pm-files-path: /tmp + zip-output: false s3: endpointOverride: http://localhost:9000 accessKeyId: minio diff --git a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index 3e65120..b53383a 100644 --- a/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -97,7 +97,7 @@ public class ApplicationConfig { private String kafkaBootStrapServers; @Getter - @Value("${app.kafka.max-poll-records:100}") + @Value("${app.kafka.max-poll-records:300}") private int kafkaMaxPollRecords; @Getter @@ -124,6 +124,11 @@ public class ApplicationConfig { @Value("${app.s3.bucket:}") private String s3Bucket; + @Getter + @Setter + @Value("${app.zip-output:}") + private boolean zipOutput; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index 6ec74d5..6c5fa25 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -20,8 +20,13 @@ package org.oran.dmaapadapter.filter; +import java.util.ArrayList; + +import lombok.Getter; import lombok.ToString; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; public interface Filter { @@ -34,6 +39,10 @@ public interface Filter { public static class FilteredData { public final byte[] key; public final byte[] value; + + @Getter + private final boolean isZipped; + private static final FilteredData emptyData = new FilteredData(null, null); public boolean isEmpty() { @@ -41,8 +50,13 @@ public interface Filter { } public FilteredData(byte[] key, byte[] value) { + this(key, value, false); + } + + public FilteredData(byte[] key, byte[] value, boolean isZipped) { this.key = key; this.value = value; + this.isZipped = isZipped; } public String getValueAString() { @@ -52,6 +66,15 @@ public interface Filter { public static FilteredData empty() { return emptyData; } + + public Iterable
headers() { + ArrayList
result = new ArrayList<>(); + if (isZipped()) { + Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null); + result.add(h); + } + return result; + } } public FilteredData filter(DataFromTopic data); diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index f57614f..e256232 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -115,22 +115,9 @@ public class Job { @Getter private BufferTimeout bufferTimeout; - private Integer maxConcurrency; - @Getter private String kafkaOutputTopic; - @Getter - private Boolean gzip; - - public int getMaxConcurrency() { - return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency; - } - - public boolean isGzip() { - return gzip != null && gzip; - } - public Filter.Type getFilterType() { if (filter == null || filterType == null) { return Filter.Type.NONE; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index cd4c002..719597a 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener { .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) // .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // - .map(input -> new DataFromTopic(null, input.getBytes())) + .map(input -> new DataFromTopic(null, input.getBytes(), false)) .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) .publish() // .autoConnect(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 5571669..ef5ab2a 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -59,6 +59,7 @@ public abstract class JobDataDistributor { private final DataStore dataStore; private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); + private final ApplicationConfig applConfig; private class ErrorStats { private int consumerFaultCounter = 0; @@ -87,6 +88,7 @@ public abstract class JobDataDistributor { } protected JobDataDistributor(Job job, ApplicationConfig applConfig) { + this.applConfig = applConfig; this.job = job; this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); @@ -105,8 +107,9 @@ public abstract class JobDataDistributor { PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; if (filter == null || filter.getFilterData().getPmRopEndTime() == null) { - this.subscription = filterAndBuffer(input, this.job) // - .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // + this.subscription = Flux.just(input) // + .flatMap(in -> filterAndBuffer(in, this.job)) // + .flatMap(this::sendToClient) // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // this::handleExceptionInStream, // @@ -138,7 +141,7 @@ public abstract class JobDataDistributor { } private Filter.FilteredData gzip(Filter.FilteredData data) { - if (job.getParameters().isGzip()) { + if (this.applConfig.isZipOutput()) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(out); @@ -146,7 +149,7 @@ public abstract class JobDataDistributor { gzip.flush(); gzip.close(); byte[] zipped = out.toByteArray(); - return new Filter.FilteredData(data.key, zipped); + return new Filter.FilteredData(data.key, zipped, true); } catch (IOException e) { logger.error("Unexpected exception when zipping: {}", e.getMessage()); return data; @@ -175,7 +178,7 @@ public abstract class JobDataDistributor { NewFileEvent ev = new NewFileEvent(fileName); - return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes()); + return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false); } private static String fileTimePartFromRopFileName(String fileName) { @@ -232,13 +235,13 @@ public abstract class JobDataDistributor { private void handleExceptionInStream(Throwable t) { logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId()); - stop(); } protected abstract Mono sendToClient(Filter.FilteredData output); public synchronized void stop() { if (this.subscription != null) { + logger.debug("Stopped, job: {}", job.getId()); this.subscription.dispose(); this.subscription = null; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 5526fc8..6b33c3b 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -96,7 +96,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor { private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { int correlationMetadata = 2; String topic = infoJob.getParameters().getKafkaOutputTopic(); - return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata); + var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers()); + return SenderRecord.create(producerRecord, correlationMetadata); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index e63d934..54262d7 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -74,11 +74,14 @@ public class KafkaTopicListener implements TopicListener { .receiveAutoAck() // .concatMap(consumerRecord -> consumerRecord) // .doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) // - .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // - .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // + .doOnError(t -> logger.error("Received error: {}", t.getMessage())) // + .onErrorResume(t -> Mono.empty()) // + .doFinally( + sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) // .filter(t -> t.value().length > 0 || t.key().length > 0) // - .map(input -> new DataFromTopic(input.key(), input.value())) // - .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() // + .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) // + .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) // + .publish() // .autoConnect(1); } @@ -115,25 +118,27 @@ public class KafkaTopicListener implements TopicListener { return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) // .map(bytes -> unzip(bytes, ev.getFilename())) // - .map(bytes -> new DataFromTopic(data.key, bytes)); + .map(bytes -> new DataFromTopic(data.key, bytes, false)); } catch (Exception e) { return Mono.just(data); } } - public static byte[] unzip(byte[] bytes, String fileName) { - if (!fileName.endsWith(".gz")) { - return bytes; - } - + public static byte[] unzip(byte[] bytes) throws IOException { try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) { - return gzipInput.readAllBytes(); + } + } + + private static byte[] unzip(byte[] bytes, String fileName) { + try { + return fileName.endsWith(".gz") ? unzip(bytes) : bytes; } catch (IOException e) { logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage()); return new byte[0]; } + } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index be230cf..8b1afc8 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,11 +20,11 @@ package org.oran.dmaapadapter.tasks; - import lombok.Getter; import lombok.Setter; import lombok.ToString; +import org.apache.kafka.common.header.Header; import org.oran.dmaapadapter.filter.PmReport; import reactor.core.publisher.Flux; @@ -34,6 +34,7 @@ public interface TopicListener { public static class DataFromTopic { public final byte[] key; public final byte[] value; + public final boolean isZipped; private static byte[] noBytes = new byte[0]; @@ -42,15 +43,29 @@ public interface TopicListener { @ToString.Exclude private PmReport cachedPmReport; - public DataFromTopic(byte[] key, byte[] value) { + public DataFromTopic(byte[] key, byte[] value, boolean isZipped) { this.key = key == null ? noBytes : key; this.value = value == null ? noBytes : value; + this.isZipped = isZipped; } public String valueAsString() { return new String(this.value); } + public static final String ZIP_PROPERTY = "gzip"; + + public static boolean findZipped(Iterable
headers) { + if (headers == null) { + return false; + } + for (Header h : headers) { + if (h.key().equals(ZIP_PROPERTY)) { + return true; + } + } + return false; + } } public Flux getFlux(); diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index c579fe2..df3f723 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -63,9 +63,6 @@ }, "kafkaOutputTopic": { "type": "string" - }, - "gzip": { - "type": "boolean" } } } \ No newline at end of file diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index e2557cd..8974d5d 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.gson.JsonParser; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.file.Path; import java.time.Duration; @@ -89,7 +90,7 @@ import reactor.kafka.sender.SenderRecord; "app.pm-files-path=./src/test/resources/", // "app.s3.locksBucket=ropfilelocks", // "app.pm-files-path=/tmp/dmaapadaptor", // - "app.s3.bucket=" // + "app.s3.bucket=dmaaptest" // }) // class IntegrationWithKafka { @@ -170,10 +171,12 @@ class IntegrationWithKafka { public final String OUTPUT_TOPIC; private TopicListener.DataFromTopic receivedKafkaOutput; private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ApplicationConfig applicationConfig; int count = 0; public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) { + this.applicationConfig = applicationConfig; this.OUTPUT_TOPIC = outputTopic; // Create a listener to the output topic. The KafkaTopicListener happens to be @@ -193,18 +196,18 @@ class IntegrationWithKafka { .subscribe(); } - boolean isUnzip = false; - private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) { - if (!this.isUnzip) { + assertThat(this.applicationConfig.isZipOutput()).isEqualTo(receivedKafkaOutput.isZipped); + if (!receivedKafkaOutput.isZipped) { return receivedKafkaOutput; } - byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value, "junk.gz"); - return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key); - } - - public void setUnzip(boolean unzip) { - this.isUnzip = unzip; + try { + byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value); + return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false); + } catch (IOException e) { + logger.error("********* ERROR ", e.getMessage()); + return null; + } } private void set(TopicListener.DataFromTopic receivedKafkaOutput) { @@ -222,7 +225,7 @@ class IntegrationWithKafka { } void reset() { - this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null); + this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false); this.count = 0; } } @@ -232,6 +235,8 @@ class IntegrationWithKafka { @BeforeEach void init() { + this.applicationConfig.setZipOutput(false); + if (kafkaReceiver == null) { kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext); kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext); @@ -289,11 +294,10 @@ class IntegrationWithKafka { return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); } - private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, - int maxConcurrency) { + private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize) { Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null; Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE) - .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build(); + .bufferTimeout(buffer).build(); String str = gson.toJson(param); return jsonObject(str); @@ -307,21 +311,20 @@ class IntegrationWithKafka { } } - ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) { + ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize) { try { String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - return new ConsumerJobInfo(KAFKA_TYPE_ID, - jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri, - ""); + return new ConsumerJobInfo(KAFKA_TYPE_ID, jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize), + "owner", targetUri, ""); } catch (Exception e) { return null; } } - ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) { + ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { try { Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) - .kafkaOutputTopic(topic).gzip(gzip).build(); + .kafkaOutputTopic(topic).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -332,10 +335,6 @@ class IntegrationWithKafka { } } - ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { - return consumerJobInfoKafka(topic, filterData, false); - } - ConsumerJobInfo consumerJobInfoKafka(String topic) { try { Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build(); @@ -429,7 +428,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); waitForKafkaListener(); @@ -449,9 +448,8 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1, - restClient()); - this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); @@ -577,22 +575,21 @@ class IntegrationWithKafka { filterData.getMeasTypes().add("pmCounterNumber1"); filterData.getMeasObjClass().add("NRCellCU"); - final boolean USE_GZIP = true; + this.applicationConfig.setZipOutput(true); final int NO_OF_JOBS = 150; ArrayList receivers = new ArrayList<>(); for (int i = 0; i < NO_OF_JOBS; ++i) { final String outputTopic = "manyJobs_" + i; - this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, USE_GZIP), outputTopic, + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient()); KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext); - receiver.setUnzip(USE_GZIP); receivers.add(receiver); } await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 100; + final int NO_OF_OBJECTS = 1000; Instant startTime = Instant.now(); @@ -608,10 +605,12 @@ class IntegrationWithKafka { var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID)); sendDataToKafka(dataToSend); - while (receivers.get(0).count != NO_OF_OBJECTS) { + logger.info("sleeping {}", kafkaReceiver.count); + while (receivers.get(0).count < NO_OF_OBJECTS) { if (kafkaReceiver.count > 0) { - logger.info("sleeping {}", kafkaReceiver.count); + logger.info("sleeping {}", receivers.get(0).count); } + Thread.sleep(1000 * 1); } @@ -676,9 +675,8 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, - restClient()); - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000), JOB_ID1, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -691,7 +689,7 @@ class IntegrationWithKafka { this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); dataToSend = Flux.just(kafkaSenderRecord("Howdy", "", KAFKA_TYPE_ID)); diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java index e3619ac..6fa7ce8 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java @@ -33,7 +33,7 @@ import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsltFilterTest { private String filterReport(JsltFilter filter) throws Exception { - DataFromTopic data = new DataFromTopic(null, loadReport().getBytes()); + DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false); FilteredData filtered = filter.filter(data); return filtered.getValueAString(); } diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java index 99d3591..6e75757 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java @@ -36,7 +36,7 @@ class JsonPathFilterTest { void testJsonPath() throws Exception { String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); JsonPathFilter filter = new JsonPathFilter(exp); - DataFromTopic data = new DataFromTopic(null, loadReport().getBytes()); + DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false); FilteredData filtered = filter.filter(data); String res = filtered.getValueAString(); assertThat(res).isEqualTo("\"attTCHSeizures\""); diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index 3e8afc2..6dd185c 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -94,7 +94,7 @@ class PmReportFilterTest { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String filterReport(PmReportFilter filter) throws Exception { - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes()); + TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); FilteredData filtered = filter.filter(data); return filtered.getValueAString(); } @@ -145,7 +145,7 @@ class PmReportFilterTest { } { - TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes()); + TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData(); utranCellFilter.measObjClass.add("UtranCell"); @@ -195,7 +195,7 @@ class PmReportFilterTest { Instant startTime = Instant.now(); for (int i = 0; i < TIMES; ++i) { - KafkaTopicListener.unzip(pmReportZipped, "junk.gz"); + KafkaTopicListener.unzip(pmReportZipped); } printDuration("Unzip", startTime, TIMES); @@ -206,7 +206,7 @@ class PmReportFilterTest { filterData.getMeasTypes().add("pmCounterNumber0"); filterData.getMeasObjClass().add("NRCellCU"); PmReportFilter filter = new PmReportFilter(filterData); - DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes()); + DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false); Instant startTime = Instant.now(); for (int i = 0; i < TIMES; ++i) { @@ -259,10 +259,10 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); PmReportFilter filter = new PmReportFilter(filterData); - FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes())); + FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false)); assertThat(filtered.isEmpty()).isTrue(); - filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes())); + filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false)); assertThat(filtered.isEmpty()).isTrue(); } -- 2.16.6