Changed priniples for zipping output 67/9467/3
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 2 Nov 2022 13:03:44 +0000 (14:03 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 3 Nov 2022 07:42:39 +0000 (08:42 +0100)
The descition of zipping the ouput is taken by the producer instead of the consumer.
The consumer will get a Kafkaheader named "gzip" if the output is gzipped.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I3016313bbb8a833c14c651492da7a87c37e71e31

14 files changed:
config/application.yaml
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/filter/Filter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java

index 8bdf414..8975849 100644 (file)
@@ -84,6 +84,7 @@ app:
   # If the file name is empty, no authorization token is used
   auth-token-file:
   pm-files-path: /tmp
+  zip-output: false
   s3:
     endpointOverride: http://localhost:9000
     accessKeyId: minio
index 3e65120..b53383a 100644 (file)
@@ -97,7 +97,7 @@ public class ApplicationConfig {
     private String kafkaBootStrapServers;
 
     @Getter
-    @Value("${app.kafka.max-poll-records:100}")
+    @Value("${app.kafka.max-poll-records:300}")
     private int kafkaMaxPollRecords;
 
     @Getter
@@ -124,6 +124,11 @@ public class ApplicationConfig {
     @Value("${app.s3.bucket:}")
     private String s3Bucket;
 
+    @Getter
+    @Setter
+    @Value("${app.zip-output:}")
+    private boolean zipOutput;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
index 6ec74d5..6c5fa25 100644 (file)
 
 package org.oran.dmaapadapter.filter;
 
+import java.util.ArrayList;
+
+import lombok.Getter;
 import lombok.ToString;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 public interface Filter {
@@ -34,6 +39,10 @@ public interface Filter {
     public static class FilteredData {
         public final byte[] key;
         public final byte[] value;
+
+        @Getter
+        private final boolean isZipped;
+
         private static final FilteredData emptyData = new FilteredData(null, null);
 
         public boolean isEmpty() {
@@ -41,8 +50,13 @@ public interface Filter {
         }
 
         public FilteredData(byte[] key, byte[] value) {
+            this(key, value, false);
+        }
+
+        public FilteredData(byte[] key, byte[] value, boolean isZipped) {
             this.key = key;
             this.value = value;
+            this.isZipped = isZipped;
         }
 
         public String getValueAString() {
@@ -52,6 +66,15 @@ public interface Filter {
         public static FilteredData empty() {
             return emptyData;
         }
+
+        public Iterable<Header> headers() {
+            ArrayList<Header> result = new ArrayList<>();
+            if (isZipped()) {
+                Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null);
+                result.add(h);
+            }
+            return result;
+        }
     }
 
     public FilteredData filter(DataFromTopic data);
index f57614f..e256232 100644 (file)
@@ -115,22 +115,9 @@ public class Job {
         @Getter
         private BufferTimeout bufferTimeout;
 
-        private Integer maxConcurrency;
-
         @Getter
         private String kafkaOutputTopic;
 
-        @Getter
-        private Boolean gzip;
-
-        public int getMaxConcurrency() {
-            return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency;
-        }
-
-        public boolean isGzip() {
-            return gzip != null && gzip;
-        }
-
         public Filter.Type getFilterType() {
             if (filter == null || filterType == null) {
                 return Filter.Type.NONE;
index cd4c002..719597a 100644 (file)
@@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener {
                 .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
                 .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
-                .map(input -> new DataFromTopic(null, input.getBytes()))
+                .map(input -> new DataFromTopic(null, input.getBytes(), false))
                 .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
                 .publish() //
                 .autoConnect();
index 5571669..ef5ab2a 100644 (file)
@@ -59,6 +59,7 @@ public abstract class JobDataDistributor {
 
     private final DataStore dataStore;
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+    private final ApplicationConfig applConfig;
 
     private class ErrorStats {
         private int consumerFaultCounter = 0;
@@ -87,6 +88,7 @@ public abstract class JobDataDistributor {
     }
 
     protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+        this.applConfig = applConfig;
         this.job = job;
         this.dataStore = DataStore.create(applConfig);
         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
@@ -105,8 +107,9 @@ public abstract class JobDataDistributor {
         PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
 
         if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
-            this.subscription = filterAndBuffer(input, this.job) //
-                    .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
+            this.subscription = Flux.just(input) //
+                    .flatMap(in -> filterAndBuffer(in, this.job)) //
+                    .flatMap(this::sendToClient) //
                     .onErrorResume(this::handleError) //
                     .subscribe(this::handleSentOk, //
                             this::handleExceptionInStream, //
@@ -138,7 +141,7 @@ public abstract class JobDataDistributor {
     }
 
     private Filter.FilteredData gzip(Filter.FilteredData data) {
-        if (job.getParameters().isGzip()) {
+        if (this.applConfig.isZipOutput()) {
             try {
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
                 GZIPOutputStream gzip = new GZIPOutputStream(out);
@@ -146,7 +149,7 @@ public abstract class JobDataDistributor {
                 gzip.flush();
                 gzip.close();
                 byte[] zipped = out.toByteArray();
-                return new Filter.FilteredData(data.key, zipped);
+                return new Filter.FilteredData(data.key, zipped, true);
             } catch (IOException e) {
                 logger.error("Unexpected exception when zipping: {}", e.getMessage());
                 return data;
@@ -175,7 +178,7 @@ public abstract class JobDataDistributor {
 
         NewFileEvent ev = new NewFileEvent(fileName);
 
-        return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes());
+        return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
     }
 
     private static String fileTimePartFromRopFileName(String fileName) {
@@ -232,13 +235,13 @@ public abstract class JobDataDistributor {
 
     private void handleExceptionInStream(Throwable t) {
         logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
-        stop();
     }
 
     protected abstract Mono<String> sendToClient(Filter.FilteredData output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
+            logger.debug("Stopped, job: {}", job.getId());
             this.subscription.dispose();
             this.subscription = null;
         }
index 5526fc8..6b33c3b 100644 (file)
@@ -96,7 +96,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
     private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
         int correlationMetadata = 2;
         String topic = infoJob.getParameters().getKafkaOutputTopic();
-        return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
+        var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
+        return SenderRecord.create(producerRecord, correlationMetadata);
     }
 
 }
index e63d934..54262d7 100644 (file)
@@ -74,11 +74,14 @@ public class KafkaTopicListener implements TopicListener {
                 .receiveAutoAck() //
                 .concatMap(consumerRecord -> consumerRecord) //
                 .doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) //
-                .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
-                .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+                .doOnError(t -> logger.error("Received error: {}", t.getMessage())) //
+                .onErrorResume(t -> Mono.empty()) //
+                .doFinally(
+                        sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
                 .filter(t -> t.value().length > 0 || t.key().length > 0) //
-                .map(input -> new DataFromTopic(input.key(), input.value())) //
-                .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() //
+                .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
+                .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) //
+                .publish() //
                 .autoConnect(1);
     }
 
@@ -115,25 +118,27 @@ public class KafkaTopicListener implements TopicListener {
 
             return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
                     .map(bytes -> unzip(bytes, ev.getFilename())) //
-                    .map(bytes -> new DataFromTopic(data.key, bytes));
+                    .map(bytes -> new DataFromTopic(data.key, bytes, false));
 
         } catch (Exception e) {
             return Mono.just(data);
         }
     }
 
-    public static byte[] unzip(byte[] bytes, String fileName) {
-        if (!fileName.endsWith(".gz")) {
-            return bytes;
-        }
-
+    public static byte[] unzip(byte[] bytes) throws IOException {
         try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
-
             return gzipInput.readAllBytes();
+        }
+    }
+
+    private static byte[] unzip(byte[] bytes, String fileName) {
+        try {
+            return fileName.endsWith(".gz") ? unzip(bytes) : bytes;
         } catch (IOException e) {
             logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage());
             return new byte[0];
         }
+
     }
 
 }
index be230cf..8b1afc8 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
-
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+import org.apache.kafka.common.header.Header;
 import org.oran.dmaapadapter.filter.PmReport;
 import reactor.core.publisher.Flux;
 
@@ -34,6 +34,7 @@ public interface TopicListener {
     public static class DataFromTopic {
         public final byte[] key;
         public final byte[] value;
+        public final boolean isZipped;
 
         private static byte[] noBytes = new byte[0];
 
@@ -42,15 +43,29 @@ public interface TopicListener {
         @ToString.Exclude
         private PmReport cachedPmReport;
 
-        public DataFromTopic(byte[] key, byte[] value) {
+        public DataFromTopic(byte[] key, byte[] value, boolean isZipped) {
             this.key = key == null ? noBytes : key;
             this.value = value == null ? noBytes : value;
+            this.isZipped = isZipped;
         }
 
         public String valueAsString() {
             return new String(this.value);
         }
 
+        public static final String ZIP_PROPERTY = "gzip";
+
+        public static boolean findZipped(Iterable<Header> headers) {
+            if (headers == null) {
+                return false;
+            }
+            for (Header h : headers) {
+                if (h.key().equals(ZIP_PROPERTY)) {
+                    return true;
+                }
+            }
+            return false;
+        }
     }
 
     public Flux<DataFromTopic> getFlux();
index c579fe2..df3f723 100644 (file)
@@ -63,9 +63,6 @@
       },
       "kafkaOutputTopic": {
          "type": "string"
-      },
-      "gzip": {
-         "type": "boolean"
       }
    }
 }
\ No newline at end of file
index e2557cd..8974d5d 100644 (file)
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.gson.JsonParser;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -89,7 +90,7 @@ import reactor.kafka.sender.SenderRecord;
         "app.pm-files-path=./src/test/resources/", //
         "app.s3.locksBucket=ropfilelocks", //
         "app.pm-files-path=/tmp/dmaapadaptor", //
-        "app.s3.bucket=" //
+        "app.s3.bucket=dmaaptest" //
 }) //
 class IntegrationWithKafka {
 
@@ -170,10 +171,12 @@ class IntegrationWithKafka {
         public final String OUTPUT_TOPIC;
         private TopicListener.DataFromTopic receivedKafkaOutput;
         private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+        private final ApplicationConfig applicationConfig;
 
         int count = 0;
 
         public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) {
+            this.applicationConfig = applicationConfig;
             this.OUTPUT_TOPIC = outputTopic;
 
             // Create a listener to the output topic. The KafkaTopicListener happens to be
@@ -193,18 +196,18 @@ class IntegrationWithKafka {
                     .subscribe();
         }
 
-        boolean isUnzip = false;
-
         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
-            if (!this.isUnzip) {
+            assertThat(this.applicationConfig.isZipOutput()).isEqualTo(receivedKafkaOutput.isZipped);
+            if (!receivedKafkaOutput.isZipped) {
                 return receivedKafkaOutput;
             }
-            byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value, "junk.gz");
-            return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key);
-        }
-
-        public void setUnzip(boolean unzip) {
-            this.isUnzip = unzip;
+            try {
+                byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value);
+                return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false);
+            } catch (IOException e) {
+                logger.error("********* ERROR ", e.getMessage());
+                return null;
+            }
         }
 
         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
@@ -222,7 +225,7 @@ class IntegrationWithKafka {
         }
 
         void reset() {
-            this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null);
+            this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false);
             this.count = 0;
         }
     }
@@ -232,6 +235,8 @@ class IntegrationWithKafka {
 
     @BeforeEach
     void init() {
+        this.applicationConfig.setZipOutput(false);
+
         if (kafkaReceiver == null) {
             kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext);
             kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext);
@@ -289,11 +294,10 @@ class IntegrationWithKafka {
         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
     }
 
-    private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
-            int maxConcurrency) {
+    private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize) {
         Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
         Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE)
-                .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build();
+                .bufferTimeout(buffer).build();
 
         String str = gson.toJson(param);
         return jsonObject(str);
@@ -307,21 +311,20 @@ class IntegrationWithKafka {
         }
     }
 
-    ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
+    ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize) {
         try {
             String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-            return new ConsumerJobInfo(KAFKA_TYPE_ID,
-                    jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri,
-                    "");
+            return new ConsumerJobInfo(KAFKA_TYPE_ID, jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize),
+                    "owner", targetUri, "");
         } catch (Exception e) {
             return null;
         }
     }
 
-    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) {
+    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
         try {
             Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
-                    .kafkaOutputTopic(topic).gzip(gzip).build();
+                    .kafkaOutputTopic(topic).build();
 
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
@@ -332,10 +335,6 @@ class IntegrationWithKafka {
         }
     }
 
-    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
-        return consumerJobInfoKafka(topic, filterData, false);
-    }
-
     ConsumerJobInfo consumerJobInfoKafka(String topic) {
         try {
             Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
@@ -429,7 +428,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
         waitForKafkaListener();
 
@@ -449,9 +448,8 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
-                restClient());
-        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
@@ -577,22 +575,21 @@ class IntegrationWithKafka {
         filterData.getMeasTypes().add("pmCounterNumber1");
         filterData.getMeasObjClass().add("NRCellCU");
 
-        final boolean USE_GZIP = true;
+        this.applicationConfig.setZipOutput(true);
         final int NO_OF_JOBS = 150;
         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
         for (int i = 0; i < NO_OF_JOBS; ++i) {
             final String outputTopic = "manyJobs_" + i;
-            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, USE_GZIP), outputTopic,
+            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
                     restClient());
             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
-            receiver.setUnzip(USE_GZIP);
             receivers.add(receiver);
         }
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 100;
+        final int NO_OF_OBJECTS = 1000;
 
         Instant startTime = Instant.now();
 
@@ -608,10 +605,12 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
         sendDataToKafka(dataToSend);
 
-        while (receivers.get(0).count != NO_OF_OBJECTS) {
+        logger.info("sleeping {}", kafkaReceiver.count);
+        while (receivers.get(0).count < NO_OF_OBJECTS) {
             if (kafkaReceiver.count > 0) {
-                logger.info("sleeping {}", kafkaReceiver.count);
+                logger.info("sleeping {}", receivers.get(0).count);
             }
+
             Thread.sleep(1000 * 1);
         }
 
@@ -676,9 +675,8 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs.
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
-                restClient());
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000), JOB_ID1, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
@@ -691,7 +689,7 @@ class IntegrationWithKafka {
         this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID2, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         dataToSend = Flux.just(kafkaSenderRecord("Howdy", "", KAFKA_TYPE_ID));
index e3619ac..6fa7ce8 100644 (file)
@@ -33,7 +33,7 @@ import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 class JsltFilterTest {
 
     private String filterReport(JsltFilter filter) throws Exception {
-        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
         FilteredData filtered = filter.filter(data);
         return filtered.getValueAString();
     }
index 99d3591..6e75757 100644 (file)
@@ -36,7 +36,7 @@ class JsonPathFilterTest {
     void testJsonPath() throws Exception {
         String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
         JsonPathFilter filter = new JsonPathFilter(exp);
-        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
         FilteredData filtered = filter.filter(data);
         String res = filtered.getValueAString();
         assertThat(res).isEqualTo("\"attTCHSeizures\"");
index 3e8afc2..6dd185c 100644 (file)
@@ -94,7 +94,7 @@ class PmReportFilterTest {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private String filterReport(PmReportFilter filter) throws Exception {
-        TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes());
+        TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
         FilteredData filtered = filter.filter(data);
         return filtered.getValueAString();
     }
@@ -145,7 +145,7 @@ class PmReportFilterTest {
         }
 
         {
-            TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes());
+            TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
 
             PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
             utranCellFilter.measObjClass.add("UtranCell");
@@ -195,7 +195,7 @@ class PmReportFilterTest {
 
             Instant startTime = Instant.now();
             for (int i = 0; i < TIMES; ++i) {
-                KafkaTopicListener.unzip(pmReportZipped, "junk.gz");
+                KafkaTopicListener.unzip(pmReportZipped);
             }
 
             printDuration("Unzip", startTime, TIMES);
@@ -206,7 +206,7 @@ class PmReportFilterTest {
             filterData.getMeasTypes().add("pmCounterNumber0");
             filterData.getMeasObjClass().add("NRCellCU");
             PmReportFilter filter = new PmReportFilter(filterData);
-            DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes());
+            DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false);
 
             Instant startTime = Instant.now();
             for (int i = 0; i < TIMES; ++i) {
@@ -259,10 +259,10 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         PmReportFilter filter = new PmReportFilter(filterData);
 
-        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes()));
+        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false));
         assertThat(filtered.isEmpty()).isTrue();
 
-        filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes()));
+        filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false));
         assertThat(filtered.isEmpty()).isTrue();
 
     }