Added gzip of output data 16/9316/2
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 17 Oct 2022 13:47:37 +0000 (15:47 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 18 Oct 2022 12:57:24 +0000 (14:57 +0200)
Bugfix historical data was broken, fixed.

Change-Id: I7ba95f962676e69ebd35b3ff467ac47cc1786b2d
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773

23 files changed:
pom.xml
src/main/java/org/oran/dmaapadapter/filter/Filter.java
src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java
src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java
src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json [moved from src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json with 100% similarity]
src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index fb3135b..b92e55b 100644 (file)
--- a/pom.xml
+++ b/pom.xml
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
-            <version>1.3.12</version>
+            <version>1.3.13</version>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>s3</artifactId>
-            <version>2.13.73</version>
+            <version>2.17.292</version>
         </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk</artifactId>
-            <version>1.11.795</version>
+            <version>1.12.321</version>
         </dependency>
     </dependencies>
     <build>
index bfd358b..6ec74d5 100644 (file)
@@ -32,19 +32,23 @@ public interface Filter {
 
     @ToString
     public static class FilteredData {
-        public final String key;
-        public final String value;
-        private static final FilteredData emptyData = new FilteredData("", "");
+        public final byte[] key;
+        public final byte[] value;
+        private static final FilteredData emptyData = new FilteredData(null, null);
 
         public boolean isEmpty() {
-            return value.isEmpty() && key.isEmpty();
+            return (key == null || key.length == 0) && (value == null || value.length == 0);
         }
 
-        public FilteredData(String key, String value) {
+        public FilteredData(byte[] key, byte[] value) {
             this.key = key;
             this.value = value;
         }
 
+        public String getValueAString() {
+            return value == null ? "" : new String(this.value);
+        }
+
         public static FilteredData empty() {
             return emptyData;
         }
index 9336617..577f83d 100644 (file)
@@ -60,7 +60,7 @@ class JsltFilter implements Filter {
             if (filteredNode == NullNode.instance) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.key, mapper.writeValueAsString(filteredNode));
+            return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode));
         } catch (Exception e) {
             return FilteredData.empty();
         }
index 36a2103..cded9a0 100644 (file)
@@ -43,8 +43,10 @@ class JsonPathFilter implements Filter {
     @Override
     public FilteredData filter(DataFromTopic data) {
         try {
-            Object o = JsonPath.parse(data.value).read(this.expression, Object.class);
-            return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o));
+            String str = new String(data.value);
+            Object o = JsonPath.parse(str).read(this.expression, Object.class);
+            String json = gson.toJson(o);
+            return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes());
         } catch (Exception e) {
             return FilteredData.empty();
         }
index e33ae68..8f4976f 100644 (file)
@@ -96,7 +96,7 @@ public class PmReportFilter implements Filter {
             if (!filter(report, this.filterData)) {
                 return FilteredData.empty();
             }
-            return new FilteredData(data.key, gson.toJson(report));
+            return new FilteredData(data.key, gson.toJson(report).getBytes());
         } catch (Exception e) {
             logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
             return FilteredData.empty();
@@ -107,7 +107,7 @@ public class PmReportFilter implements Filter {
     private PmReport createPmReport(DataFromTopic data) {
         synchronized (data) {
             if (data.getCachedPmReport() == null) {
-                data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class));
+                data.setCachedPmReport(gsonParse.fromJson(data.valueAsString(), PmReport.class));
             }
             return data.getCachedPmReport();
         }
index b1000ed..4806604 100644 (file)
@@ -44,7 +44,7 @@ class RegexpFilter implements Filter {
         if (regexp == null) {
             return new FilteredData(data.key, data.value);
         }
-        Matcher matcher = regexp.matcher(data.value);
+        Matcher matcher = regexp.matcher(data.valueAsString());
         boolean match = matcher.find();
         if (match) {
             return new FilteredData(data.key, data.value);
index 9fd8e57..b5b7e52 100644 (file)
@@ -84,19 +84,20 @@ public class Job {
         @Builder.Default
         int noOfSentBytes = 0;
 
-        public void received(String str) {
-            noOfReceivedBytes += str.length();
+        public void received(byte[] bytes) {
+            noOfReceivedBytes += bytes.length;
             noOfReceivedObjects += 1;
 
         }
 
-        public void filtered(String str) {
-            noOfSentBytes += str.length();
+        public void filtered(byte[] bytes) {
+            noOfSentBytes += bytes.length;
             noOfSentObjects += 1;
         }
 
     }
 
+    @Builder
     public static class Parameters {
         public static final String REGEXP_TYPE = "regexp";
         public static final String PM_FILTER_TYPE = "pmdata";
@@ -104,9 +105,12 @@ public class Job {
         public static final String JSON_PATH_FILTER_TYPE = "json-path";
 
         @Setter
+        @Builder.Default
         private String filterType = REGEXP_TYPE;
+
         @Getter
         private Object filter;
+
         @Getter
         private BufferTimeout bufferTimeout;
 
@@ -115,19 +119,15 @@ public class Job {
         @Getter
         private String kafkaOutputTopic;
 
-        public Parameters() {}
+        @Getter
+        private Boolean gzip;
 
-        public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency,
-                String kafkaOutputTopic) {
-            this.filter = filter;
-            this.bufferTimeout = bufferTimeout;
-            this.maxConcurrency = maxConcurrency;
-            this.filterType = filterType;
-            this.kafkaOutputTopic = kafkaOutputTopic;
+        public int getMaxConcurrency() {
+            return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency;
         }
 
-        public int getMaxConcurrency() {
-            return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
+        public boolean isGzip() {
+            return gzip != null && gzip;
         }
 
         public Filter.Type getFilterType() {
index 4799034..cd4c002 100644 (file)
@@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener {
                 .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
                 .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
-                .map(input -> new DataFromTopic("", input))
+                .map(input -> new DataFromTopic(null, input.getBytes()))
                 .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
                 .publish() //
                 .autoConnect();
index f57c12d..ef10d3a 100644 (file)
@@ -26,6 +26,8 @@ import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -36,8 +38,8 @@ import reactor.core.publisher.Mono;
 public class HttpJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpJobDataDistributor(Job job, ApplicationConfig config) {
-        super(job, config);
+    public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux<TopicListener.DataFromTopic> input) {
+        super(job, config, input);
     }
 
     @Override
@@ -45,7 +47,7 @@ public class HttpJobDataDistributor extends JobDataDistributor {
         Job job = this.getJob();
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
         MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
-        return job.getConsumerRestClient().post("", output.value, contentType);
+        return job.getConsumerRestClient().post("", output.getValueAString(), contentType);
     }
 
 }
index 05fbbc6..45a232d 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
+import java.util.zip.GZIPOutputStream;
 
 import lombok.Getter;
 
@@ -84,16 +87,12 @@ public abstract class JobDataDistributor {
         }
     }
 
-    protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+    protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux<TopicListener.DataFromTopic> input) {
         this.job = job;
         this.applConfig = applConfig;
         this.dataStore = DataStore.create(applConfig);
         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
         this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
-    }
-
-    public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
-        collectHistoricalData();
 
         this.errorStats.resetIrrecoverableErrors();
         this.subscription = filterAndBuffer(input, this.job) //
@@ -110,7 +109,7 @@ public abstract class JobDataDistributor {
         }
     }
 
-    private void collectHistoricalData() {
+    public void collectHistoricalData() {
         PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
 
         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
@@ -129,17 +128,34 @@ public abstract class JobDataDistributor {
                     .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
                             dataStore), 100)
                     .map(job::filter) //
+                    .map(this::gzip) //
                     .flatMap(this::sendToClient, 1) //
                     .onErrorResume(this::handleCollectHistoricalDataError) //
                     .subscribe();
         }
     }
 
+    private Filter.FilteredData gzip(Filter.FilteredData data) {
+        if (job.getParameters().isGzip()) {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            try (GZIPOutputStream gzip = new GZIPOutputStream(out)) {
+                gzip.write(data.value);
+                return new Filter.FilteredData(data.key, out.toByteArray());
+            } catch (IOException e) {
+                logger.error("Unexpected exception when zipping: {}", e.getMessage());
+                return data;
+            }
+        } else {
+            return data;
+        }
+    }
+
     private Mono<String> handleCollectHistoricalDataError(Throwable t) {
         if (t instanceof LockedException) {
             logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
             return Mono.empty(); // Ignore
         } else {
+            logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
             return tryDeleteLockFile() //
                     .map(bool -> "OK");
         }
@@ -153,11 +169,11 @@ public abstract class JobDataDistributor {
 
         NewFileEvent ev = new NewFileEvent(fileName);
 
-        return new TopicListener.DataFromTopic("", gson.toJson(ev));
+        return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes());
     }
 
     private boolean filterStartTime(String startTimeStr, String fileName) {
-        // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json
+        // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
         try {
             if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) {
 
@@ -212,15 +228,16 @@ public abstract class JobDataDistributor {
         Flux<Filter.FilteredData> filtered = //
                 inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
                         .map(job::filter) //
+                        .map(this::gzip) //
                         .filter(f -> !f.isEmpty()) //
                         .doOnNext(f -> job.getStatistics().filtered(f.value)); //
 
         if (job.isBuffered()) {
-            filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
+            filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(buffered -> new Filter.FilteredData("", buffered.toString()));
+                    .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes()));
         }
         return filtered;
     }
index 4a68603..5e09714 100644 (file)
@@ -23,9 +23,10 @@ package org.oran.dmaapadapter.tasks;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.repository.Job;
@@ -46,20 +47,23 @@ import reactor.kafka.sender.SenderRecord;
 public class KafkaJobDataDistributor extends JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
 
-    private KafkaSender<String, String> sender;
+    private KafkaSender<byte[], byte[]> sender;
     private final ApplicationConfig appConfig;
 
-    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
-        super(job, appConfig);
+    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux<TopicListener.DataFromTopic> input) {
+        super(job, appConfig, input);
         this.appConfig = appConfig;
+        SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
+        this.sender = KafkaSender.create(senderOptions);
     }
 
     @Override
     protected Mono<String> sendToClient(Filter.FilteredData data) {
         Job job = this.getJob();
-        SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
+        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, job);
 
-        logger.trace("Sending data '{}' to Kafka topic: {}", data, job.getParameters().getKafkaOutputTopic());
+        logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
+                job.getParameters().getKafkaOutputTopic());
 
         return this.sender.send(Mono.just(senderRecord)) //
                 .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) //
@@ -67,14 +71,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
                         t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) //
                 .onErrorResume(t -> Mono.empty()) //
                 .collectList() //
-                .map(x -> data.value);
-    }
+                .map(x -> "ok");
 
-    @Override
-    public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
-        super.start(input);
-        SenderOptions<String, String> senderOptions = senderOptions(appConfig);
-        this.sender = KafkaSender.create(senderOptions);
     }
 
     @Override
@@ -86,18 +84,18 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         }
     }
 
-    private static SenderOptions<String, String> senderOptions(ApplicationConfig config) {
+    private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config) {
         String bootstrapServers = config.getKafkaBootStrapServers();
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ProducerConfig.ACKS_CONFIG, "all");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
+    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
         int correlationMetadata = 2;
         String topic = infoJob.getParameters().getKafkaOutputTopic();
         return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
index b238b6e..960ecef 100644 (file)
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.datastore.DataStore;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -40,7 +40,6 @@ import reactor.core.publisher.Mono;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
 
-
 /**
  * The class streams incoming requests from a Kafka topic and sends them further
  * to a multi cast sink, which several other streams can connect to.
@@ -74,42 +73,40 @@ public class KafkaTopicListener implements TopicListener {
         return KafkaReceiver.create(kafkaInputProperties(clientId)) //
                 .receiveAutoAck() //
                 .concatMap(consumerRecord -> consumerRecord) //
-                .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
-                        input.value())) //
+                .doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) //
                 .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
                 .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
-                .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+                .filter(t -> t.value().length > 0 || t.key().length > 0) //
                 .map(input -> new DataFromTopic(input.key(), input.value())) //
-                .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) //
-                .publish() //
+                .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() //
                 .autoConnect(1);
     }
 
-    private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
+    private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
         Map<String, Object> consumerProps = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
         consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
         consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords());
 
-        return ReceiverOptions.<String, String>create(consumerProps)
+        return ReceiverOptions.<byte[], byte[]>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
     }
 
     public static Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type,
             DataStore fileStore) {
-        if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) {
+        if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) {
             return Mono.just(data);
         }
 
         try {
-            NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+            NewFileEvent ev = gson.fromJson(data.valueAsString(), NewFileEvent.class);
 
             if (ev.getFilename() == null) {
                 logger.warn("Ignoring received message: {}", data);
index 3f06457..be230cf 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.oran.dmaapadapter.tasks;
 
-import java.nio.charset.StandardCharsets;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -33,22 +32,23 @@ public interface TopicListener {
 
     @ToString
     public static class DataFromTopic {
-        public final String key;
-        public final String value;
+        public final byte[] key;
+        public final byte[] value;
+
+        private static byte[] noBytes = new byte[0];
 
         @Getter
         @Setter
         @ToString.Exclude
         private PmReport cachedPmReport;
 
-        public DataFromTopic(String key, String value) {
-            this.key = key;
-            this.value = value;
+        public DataFromTopic(byte[] key, byte[] value) {
+            this.key = key == null ? noBytes : key;
+            this.value = value == null ? noBytes : value;
         }
 
-        public DataFromTopic(String key, byte[] value) {
-            this.key = key;
-            this.value = new String(value, StandardCharsets.UTF_8);
+        public String valueAsString() {
+            return new String(this.value);
         }
 
     }
index d5868e5..4d76cd1 100644 (file)
@@ -93,16 +93,19 @@ public class TopicListeners {
         }
     }
 
-    private JobDataDistributor createConsumer(Job job) {
-        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
-                : new HttpJobDataDistributor(job, appConfig);
+    private JobDataDistributor createConsumer(Job job, TopicListener topicListener) {
+        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic())
+                ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux())
+                : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux());
     }
 
     private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
             Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
-        JobDataDistributor distributor = createConsumer(job);
-        distributor.start(topicListener.getFlux());
+        JobDataDistributor distributor = createConsumer(job, topicListener);
+
+        distributor.collectHistoricalData();
+
         distributors.put(job.getType().getId(), job.getId(), distributor);
     }
 
index 7d5ab62..5b4ab89 100644 (file)
@@ -75,6 +75,9 @@
       "kafkaOutputTopic": {
          "type": "string"
       },
+      "gzip": {
+         "type": "boolean"
+      },
       "bufferTimeout": {
          "type": "object",
          "additionalProperties": false,
index 5d20541..139c1bb 100644 (file)
@@ -63,10 +63,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.JobDataDistributor;
 import org.oran.dmaapadapter.tasks.NewFileEvent;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
-import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +82,6 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -391,38 +388,6 @@ class ApplicationTest {
                 "Could not find type");
     }
 
-    @Test
-    void testReceiveAndPostDataFromKafka() throws Exception {
-        final String JOB_ID = "ID";
-        final String TYPE_ID = "PmDataOverKafka";
-        waitForRegistration();
-
-        // Create a job
-        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
-        String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-        ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, "");
-
-        this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
-        JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID);
-
-        // Handle received data from Kafka, check that it has been posted to the
-        // consumer
-        kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
-
-        ConsumerController.TestResults consumer = this.consumerController.testResults;
-        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]");
-        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
-
-        // This only works in debugger. Removed for now.
-        assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull();
-        assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId)
-                .isEqualTo("xml-file-data-to-filestore");
-
-    }
-
     @Test
     void testReceiveAndPostDataFromDmaapBuffering() throws Exception {
         final String JOB_ID = "testReceiveAndPostDataFromDmaap";
@@ -431,7 +396,8 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job
-        Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
+        Job.Parameters param = Job.Parameters.builder().bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+
         ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -457,7 +423,7 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job
-        Job.Parameters param = new Job.Parameters(null, null, null, 1, null);
+        Job.Parameters param = Job.Parameters.builder().build();
         ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -500,8 +466,8 @@ class ApplicationTest {
         filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
-        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
-                new Job.BufferTimeout(123, 456), null, null);
+        Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
+                .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
 
@@ -540,14 +506,15 @@ class ApplicationTest {
 
         DataStore fileStore = DataStore.create(this.applicationConfig);
         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
-                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.getSourceNames().add("O-DU-1122");
         filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
 
-        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
-                new Job.BufferTimeout(123, 456), null, null);
+        Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
+                .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
 
@@ -569,7 +536,8 @@ class ApplicationTest {
         // Create a job with a PM filter
         String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" //
                 + ".";
-        Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null);
+        Job.Parameters param =
+                Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build();
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson));
 
@@ -669,7 +637,9 @@ class ApplicationTest {
         // Create a job with a PM filter
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         filterData.getMeasTypes().add("succImmediateAssignProcs");
-        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null);
+        Job.Parameters param =
+                Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
+
         String paramJson = gson.toJson(param);
 
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson));
index 4ec51ff..5f7a886 100644 (file)
@@ -230,8 +230,8 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param =
-                new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null);
+        Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE)
+                .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
@@ -250,8 +250,8 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
         final String TYPE_ID = "KafkaInformationType";
 
-        Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE,
-                new Job.BufferTimeout(123, 170 * 1000), 1, null);
+        Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE)
+                .bufferTimeout(new Job.BufferTimeout(123, 170 * 1000)).build();
 
         ConsumerJobInfo jobInfo =
                 new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
index 711b0c1..4b360b3 100644 (file)
@@ -36,7 +36,7 @@ import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -194,16 +194,15 @@ class IntegrationWithKafka {
         }
 
         synchronized String lastKey() {
-            return this.receivedKafkaOutput.key;
+            return new String(this.receivedKafkaOutput.key);
         }
 
         synchronized String lastValue() {
-            return this.receivedKafkaOutput.value;
+            return new String(this.receivedKafkaOutput.value);
         }
 
         void reset() {
-            count = 0;
-            this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
+            this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null);
         }
     }
 
@@ -272,7 +271,9 @@ class IntegrationWithKafka {
     private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
             int maxConcurrency) {
         Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
-        Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null);
+        Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE)
+                .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build();
+
         String str = gson.toJson(param);
         return jsonObject(str);
     }
@@ -296,9 +297,10 @@ class IntegrationWithKafka {
         }
     }
 
-    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) {
         try {
-            Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+            Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
+                    .kafkaOutputTopic(topic).gzip(gzip).build();
 
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
@@ -309,9 +311,13 @@ class IntegrationWithKafka {
         }
     }
 
+    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+        return consumerJobInfoKafka(topic, filterData, false);
+    }
+
     ConsumerJobInfo consumerJobInfoKafka(String topic) {
         try {
-            Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
+            Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
 
@@ -321,26 +327,27 @@ class IntegrationWithKafka {
         }
     }
 
-    private SenderOptions<String, String> kafkaSenderOptions() {
+    private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
         String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> kafkaSenderRecord(String data, String key, String typeId) {
+    private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key, String typeId) {
         final InfoType infoType = this.types.get(typeId);
         int correlationMetadata = 2;
-        return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
+        return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()),
+                correlationMetadata);
     }
 
-    private void sendDataToKafka(Flux<SenderRecord<String, String, Integer>> dataToSend) {
-        final KafkaSender<String, String> sender = KafkaSender.create(kafkaSenderOptions());
+    private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
+        final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
 
         sender.send(dataToSend) //
                 .doOnError(e -> logger.error("Send failed", e)) //
@@ -440,7 +447,7 @@ class IntegrationWithKafka {
     private void printStatistics() {
         String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
         String stats = restClient().get(targetUri).block();
-        logger.info("Stats : {}", stats);
+        logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000));
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@@ -539,14 +546,15 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
-        filterData.getMeasObjClass().add("UtranCell");
+        filterData.getMeasTypes().add("pmAnrNcgiMeasFailUeCap");
+        filterData.getMeasTypes().add("pmAnrNcgiMeasRcvDrx");
+        filterData.getMeasObjInstIds().add("ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32");
 
         final int NO_OF_JOBS = 150;
         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
         for (int i = 0; i < NO_OF_JOBS; ++i) {
             final String outputTopic = "manyJobs_" + i;
-            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
+            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic,
                     restClient());
             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
             receivers.add(receiver);
@@ -555,14 +563,15 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 500;
+        final int NO_OF_OBJECTS = 1000;
 
         Instant startTime = Instant.now();
 
-        final String FILE_NAME = "pm_report.json.gz";
+        final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
 
         DataStore fileStore = dataStore();
 
+        fileStore.deleteBucket(DataStore.Bucket.FILES).block();
         fileStore.create(DataStore.Bucket.FILES).block();
         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
 
@@ -571,8 +580,8 @@ class IntegrationWithKafka {
         sendDataToKafka(dataToSend);
 
         while (receivers.get(0).count != NO_OF_OBJECTS) {
-            logger.info("sleeping {}", kafkaReceiver.count);
-            Thread.sleep(1000 * 1);
+            // logger.info("sleeping {}", kafkaReceiver.count);
+            Thread.sleep(100 * 1);
         }
 
         final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
@@ -584,7 +593,7 @@ class IntegrationWithKafka {
             }
         }
 
-        // printStatistics();
+        printStatistics();
     }
 
     private String newFileEvent(String fileName) {
@@ -601,16 +610,16 @@ class IntegrationWithKafka {
     @Test
     void testHistoricalData() throws Exception {
         // test
-        waitForKafkaListener();
         final String JOB_ID = "testHistoricalData";
 
         DataStore fileStore = dataStore();
 
+        fileStore.deleteBucket(DataStore.Bucket.FILES).block();
         fileStore.create(DataStore.Bucket.FILES).block();
         fileStore.create(DataStore.Bucket.LOCKS).block();
 
         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
-                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
 
         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
 
@@ -624,7 +633,7 @@ class IntegrationWithKafka {
                 restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
+        await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());
     }
 
     @Test
index a18034e..e3619ac 100644 (file)
@@ -27,12 +27,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.filter.Filter.FilteredData;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 class JsltFilterTest {
 
     private String filterReport(JsltFilter filter) throws Exception {
-        return filter.filter(new DataFromTopic("", loadReport())).value;
+        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+        FilteredData filtered = filter.filter(data);
+        return filtered.getValueAString();
     }
 
     @Test
index ce00c4d..99d3591 100644 (file)
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.filter.Filter.FilteredData;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 
 class JsonPathFilterTest {
@@ -35,7 +36,9 @@ class JsonPathFilterTest {
     void testJsonPath() throws Exception {
         String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
         JsonPathFilter filter = new JsonPathFilter(exp);
-        String res = filter.filter(new DataFromTopic("", loadReport())).value;
+        DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+        FilteredData filtered = filter.filter(data);
+        String res = filtered.getValueAString();
         assertThat(res).isEqualTo("\"attTCHSeizures\"");
     }
 
index cbdf9e7..3e465da 100644 (file)
@@ -27,12 +27,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 
 import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.filter.Filter.FilteredData;
 import org.oran.dmaapadapter.tasks.TopicListener;
 
 class PmReportFilterTest {
 
     private String filterReport(PmReportFilter filter) throws Exception {
-        return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value;
+        TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes());
+        FilteredData filtered = filter.filter(data);
+        return filtered.getValueAString();
     }
 
     @Test
@@ -120,11 +123,11 @@ class PmReportFilterTest {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         PmReportFilter filter = new PmReportFilter(filterData);
 
-        String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value;
-        assertThat(filtered).isEmpty();
+        FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes()));
+        assertThat(filtered.isEmpty()).isTrue();
 
-        filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value;
-        assertThat(filtered).isEmpty();
+        filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes()));
+        assertThat(filtered.isEmpty()).isTrue();
 
     }
 
diff --git a/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz
new file mode 100644 (file)
index 0000000..1ebd353
Binary files /dev/null and b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz differ