From: PatrikBuhr Date: Mon, 17 Oct 2022 13:47:37 +0000 (+0200) Subject: Added gzip of output data X-Git-Tag: 1.2.0~8 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F16%2F9316%2F2;p=nonrtric%2Fplt%2Fdmaapadapter.git Added gzip of output data Bugfix historical data was broken, fixed. Change-Id: I7ba95f962676e69ebd35b3ff467ac47cc1786b2d Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 --- diff --git a/pom.xml b/pom.xml index fb3135b..b92e55b 100644 --- a/pom.xml +++ b/pom.xml @@ -197,7 +197,7 @@ io.projectreactor.kafka reactor-kafka - 1.3.12 + 1.3.13 com.google.guava @@ -207,12 +207,12 @@ software.amazon.awssdk s3 - 2.13.73 + 2.17.292 com.amazonaws aws-java-sdk - 1.11.795 + 1.12.321 diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index bfd358b..6ec74d5 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -32,19 +32,23 @@ public interface Filter { @ToString public static class FilteredData { - public final String key; - public final String value; - private static final FilteredData emptyData = new FilteredData("", ""); + public final byte[] key; + public final byte[] value; + private static final FilteredData emptyData = new FilteredData(null, null); public boolean isEmpty() { - return value.isEmpty() && key.isEmpty(); + return (key == null || key.length == 0) && (value == null || value.length == 0); } - public FilteredData(String key, String value) { + public FilteredData(byte[] key, byte[] value) { this.key = key; this.value = value; } + public String getValueAString() { + return value == null ? "" : new String(this.value); + } + public static FilteredData empty() { return emptyData; } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java index 9336617..577f83d 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java @@ -60,7 +60,7 @@ class JsltFilter implements Filter { if (filteredNode == NullNode.instance) { return FilteredData.empty(); } - return new FilteredData(data.key, mapper.writeValueAsString(filteredNode)); + return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode)); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java index 36a2103..cded9a0 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java @@ -43,8 +43,10 @@ class JsonPathFilter implements Filter { @Override public FilteredData filter(DataFromTopic data) { try { - Object o = JsonPath.parse(data.value).read(this.expression, Object.class); - return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o)); + String str = new String(data.value); + Object o = JsonPath.parse(str).read(this.expression, Object.class); + String json = gson.toJson(o); + return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes()); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index e33ae68..8f4976f 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -96,7 +96,7 @@ public class PmReportFilter implements Filter { if (!filter(report, this.filterData)) { return FilteredData.empty(); } - return new FilteredData(data.key, gson.toJson(report)); + return new FilteredData(data.key, gson.toJson(report).getBytes()); } catch (Exception e) { logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage()); return FilteredData.empty(); @@ -107,7 +107,7 @@ public class PmReportFilter implements Filter { private PmReport createPmReport(DataFromTopic data) { synchronized (data) { if (data.getCachedPmReport() == null) { - data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class)); + data.setCachedPmReport(gsonParse.fromJson(data.valueAsString(), PmReport.class)); } return data.getCachedPmReport(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java index b1000ed..4806604 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java @@ -44,7 +44,7 @@ class RegexpFilter implements Filter { if (regexp == null) { return new FilteredData(data.key, data.value); } - Matcher matcher = regexp.matcher(data.value); + Matcher matcher = regexp.matcher(data.valueAsString()); boolean match = matcher.find(); if (match) { return new FilteredData(data.key, data.value); diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 9fd8e57..b5b7e52 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -84,19 +84,20 @@ public class Job { @Builder.Default int noOfSentBytes = 0; - public void received(String str) { - noOfReceivedBytes += str.length(); + public void received(byte[] bytes) { + noOfReceivedBytes += bytes.length; noOfReceivedObjects += 1; } - public void filtered(String str) { - noOfSentBytes += str.length(); + public void filtered(byte[] bytes) { + noOfSentBytes += bytes.length; noOfSentObjects += 1; } } + @Builder public static class Parameters { public static final String REGEXP_TYPE = "regexp"; public static final String PM_FILTER_TYPE = "pmdata"; @@ -104,9 +105,12 @@ public class Job { public static final String JSON_PATH_FILTER_TYPE = "json-path"; @Setter + @Builder.Default private String filterType = REGEXP_TYPE; + @Getter private Object filter; + @Getter private BufferTimeout bufferTimeout; @@ -115,19 +119,15 @@ public class Job { @Getter private String kafkaOutputTopic; - public Parameters() {} + @Getter + private Boolean gzip; - public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency, - String kafkaOutputTopic) { - this.filter = filter; - this.bufferTimeout = bufferTimeout; - this.maxConcurrency = maxConcurrency; - this.filterType = filterType; - this.kafkaOutputTopic = kafkaOutputTopic; + public int getMaxConcurrency() { + return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency; } - public int getMaxConcurrency() { - return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency; + public boolean isGzip() { + return gzip != null && gzip; } public Filter.Type getFilterType() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 4799034..cd4c002 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener { .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) // .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // - .map(input -> new DataFromTopic("", input)) + .map(input -> new DataFromTopic(null, input.getBytes())) .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) .publish() // .autoConnect(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index f57c12d..ef10d3a 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -26,6 +26,8 @@ import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -36,8 +38,8 @@ import reactor.core.publisher.Mono; public class HttpJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpJobDataDistributor(Job job, ApplicationConfig config) { - super(job, config); + public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux input) { + super(job, config, input); } @Override @@ -45,7 +47,7 @@ public class HttpJobDataDistributor extends JobDataDistributor { Job job = this.getJob(); logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; - return job.getConsumerRestClient().post("", output.value, contentType); + return job.getConsumerRestClient().post("", output.getValueAString(), contentType); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 05fbbc6..45a232d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -20,9 +20,12 @@ package org.oran.dmaapadapter.tasks; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; +import java.util.zip.GZIPOutputStream; import lombok.Getter; @@ -84,16 +87,12 @@ public abstract class JobDataDistributor { } } - protected JobDataDistributor(Job job, ApplicationConfig applConfig) { + protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux input) { this.job = job; this.applConfig = applConfig; this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); this.dataStore.create(DataStore.Bucket.LOCKS).subscribe(); - } - - public synchronized void start(Flux input) { - collectHistoricalData(); this.errorStats.resetIrrecoverableErrors(); this.subscription = filterAndBuffer(input, this.job) // @@ -110,7 +109,7 @@ public abstract class JobDataDistributor { } } - private void collectHistoricalData() { + public void collectHistoricalData() { PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { @@ -129,17 +128,34 @@ public abstract class JobDataDistributor { .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(), dataStore), 100) .map(job::filter) // + .map(this::gzip) // .flatMap(this::sendToClient, 1) // .onErrorResume(this::handleCollectHistoricalDataError) // .subscribe(); } } + private Filter.FilteredData gzip(Filter.FilteredData data) { + if (job.getParameters().isGzip()) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(out)) { + gzip.write(data.value); + return new Filter.FilteredData(data.key, out.toByteArray()); + } catch (IOException e) { + logger.error("Unexpected exception when zipping: {}", e.getMessage()); + return data; + } + } else { + return data; + } + } + private Mono handleCollectHistoricalDataError(Throwable t) { if (t instanceof LockedException) { logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId()); return Mono.empty(); // Ignore } else { + logger.error("Exception: {} job: {}", t.getMessage(), job.getId()); return tryDeleteLockFile() // .map(bool -> "OK"); } @@ -153,11 +169,11 @@ public abstract class JobDataDistributor { NewFileEvent ev = new NewFileEvent(fileName); - return new TopicListener.DataFromTopic("", gson.toJson(ev)); + return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes()); } private boolean filterStartTime(String startTimeStr, String fileName) { - // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json try { if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) { @@ -212,15 +228,16 @@ public abstract class JobDataDistributor { Flux filtered = // inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) // .map(job::filter) // + .map(this::gzip) // .filter(f -> !f.isEmpty()) // .doOnNext(f -> job.getStatistics().filtered(f.value)); // if (job.isBuffered()) { - filtered = filtered.map(input -> quoteNonJson(input.value, job)) // + filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(buffered -> new Filter.FilteredData("", buffered.toString())); + .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes())); } return filtered; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 4a68603..5e09714 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -23,9 +23,10 @@ package org.oran.dmaapadapter.tasks; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; @@ -46,20 +47,23 @@ import reactor.kafka.sender.SenderRecord; public class KafkaJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class); - private KafkaSender sender; + private KafkaSender sender; private final ApplicationConfig appConfig; - public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { - super(job, appConfig); + public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux input) { + super(job, appConfig, input); this.appConfig = appConfig; + SenderOptions senderOptions = senderOptions(appConfig); + this.sender = KafkaSender.create(senderOptions); } @Override protected Mono sendToClient(Filter.FilteredData data) { Job job = this.getJob(); - SenderRecord senderRecord = senderRecord(data, job); + SenderRecord senderRecord = senderRecord(data, job); - logger.trace("Sending data '{}' to Kafka topic: {}", data, job.getParameters().getKafkaOutputTopic()); + logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10), + job.getParameters().getKafkaOutputTopic()); return this.sender.send(Mono.just(senderRecord)) // .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) // @@ -67,14 +71,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor { t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) // .onErrorResume(t -> Mono.empty()) // .collectList() // - .map(x -> data.value); - } + .map(x -> "ok"); - @Override - public synchronized void start(Flux input) { - super.start(input); - SenderOptions senderOptions = senderOptions(appConfig); - this.sender = KafkaSender.create(senderOptions); } @Override @@ -86,18 +84,18 @@ public class KafkaJobDataDistributor extends JobDataDistributor { } } - private static SenderOptions senderOptions(ApplicationConfig config) { + private static SenderOptions senderOptions(ApplicationConfig config) { String bootstrapServers = config.getKafkaBootStrapServers(); Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return SenderOptions.create(props); } - private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { + private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { int correlationMetadata = 2; String topic = infoJob.getParameters().getKafkaOutputTopic(); return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index b238b6e..960ecef 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.zip.GZIPInputStream; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.datastore.DataStore; import org.oran.dmaapadapter.repository.InfoType; @@ -40,7 +40,6 @@ import reactor.core.publisher.Mono; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; - /** * The class streams incoming requests from a Kafka topic and sends them further * to a multi cast sink, which several other streams can connect to. @@ -74,42 +73,40 @@ public class KafkaTopicListener implements TopicListener { return KafkaReceiver.create(kafkaInputProperties(clientId)) // .receiveAutoAck() // .concatMap(consumerRecord -> consumerRecord) // - .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), - input.value())) // + .doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) // .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // - .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) // + .filter(t -> t.value().length > 0 || t.key().length > 0) // .map(input -> new DataFromTopic(input.key(), input.value())) // - .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) // - .publish() // + .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() // .autoConnect(1); } - private ReceiverOptions kafkaInputProperties(String clientId) { + private ReceiverOptions kafkaInputProperties(String clientId) { Map consumerProps = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId()); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords()); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); } public static Mono getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type, DataStore fileStore) { - if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) { + if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) { return Mono.just(data); } try { - NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); + NewFileEvent ev = gson.fromJson(data.valueAsString(), NewFileEvent.class); if (ev.getFilename() == null) { logger.warn("Ignoring received message: {}", data); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 3f06457..be230cf 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,7 +20,6 @@ package org.oran.dmaapadapter.tasks; -import java.nio.charset.StandardCharsets; import lombok.Getter; import lombok.Setter; @@ -33,22 +32,23 @@ public interface TopicListener { @ToString public static class DataFromTopic { - public final String key; - public final String value; + public final byte[] key; + public final byte[] value; + + private static byte[] noBytes = new byte[0]; @Getter @Setter @ToString.Exclude private PmReport cachedPmReport; - public DataFromTopic(String key, String value) { - this.key = key; - this.value = value; + public DataFromTopic(byte[] key, byte[] value) { + this.key = key == null ? noBytes : key; + this.value = value == null ? noBytes : value; } - public DataFromTopic(String key, byte[] value) { - this.key = key; - this.value = new String(value, StandardCharsets.UTF_8); + public String valueAsString() { + return new String(this.value); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index d5868e5..4d76cd1 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -93,16 +93,19 @@ public class TopicListeners { } } - private JobDataDistributor createConsumer(Job job) { - return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) - : new HttpJobDataDistributor(job, appConfig); + private JobDataDistributor createConsumer(Job job, TopicListener topicListener) { + return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) + ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux()) + : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux()); } private void addConsumer(Job job, MultiMap distributors, Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); - JobDataDistributor distributor = createConsumer(job); - distributor.start(topicListener.getFlux()); + JobDataDistributor distributor = createConsumer(job, topicListener); + + distributor.collectHistoricalData(); + distributors.put(job.getType().getId(), job.getId(), distributor); } diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index 7d5ab62..5b4ab89 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -75,6 +75,9 @@ "kafkaOutputTopic": { "type": "string" }, + "gzip": { + "type": "boolean" + }, "bufferTimeout": { "type": "object", "additionalProperties": false, diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 5d20541..139c1bb 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -63,10 +63,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.JobDataDistributor; import org.oran.dmaapadapter.tasks.NewFileEvent; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; -import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +82,6 @@ import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -391,38 +388,6 @@ class ApplicationTest { "Could not find type"); } - @Test - void testReceiveAndPostDataFromKafka() throws Exception { - final String JOB_ID = "ID"; - final String TYPE_ID = "PmDataOverKafka"; - waitForRegistration(); - - // Create a job - Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null); - String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, ""); - - this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID); - - // Handle received data from Kafka, check that it has been posted to the - // consumer - kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data"))); - - ConsumerController.TestResults consumer = this.consumerController.testResults; - await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]"); - assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); - - // This only works in debugger. Removed for now. - assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull(); - assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId) - .isEqualTo("xml-file-data-to-filestore"); - - } - @Test void testReceiveAndPostDataFromDmaapBuffering() throws Exception { final String JOB_ID = "testReceiveAndPostDataFromDmaap"; @@ -431,7 +396,8 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null); + Job.Parameters param = Job.Parameters.builder().bufferTimeout(new Job.BufferTimeout(123, 456)).build(); + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -457,7 +423,7 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, null, 1, null); + Job.Parameters param = Job.Parameters.builder().build(); ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -500,8 +466,8 @@ class ApplicationTest { filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, - new Job.BufferTimeout(123, 456), null, null); + Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); @@ -540,14 +506,15 @@ class ApplicationTest { DataStore fileStore = DataStore.create(this.applicationConfig); fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), - "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block(); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getSourceNames().add("O-DU-1122"); filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, - new Job.BufferTimeout(123, 456), null, null); + Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); + String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); @@ -569,7 +536,8 @@ class ApplicationTest { // Create a job with a PM filter String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" // + "."; - Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null); + Job.Parameters param = + Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build(); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson)); @@ -669,7 +637,9 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getMeasTypes().add("succImmediateAssignProcs"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null); + Job.Parameters param = + Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); + String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson)); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 4ec51ff..5f7a886 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -230,8 +230,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = - new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null); + Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); @@ -250,8 +250,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, - new Job.BufferTimeout(123, 170 * 1000), 1, null); + Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 170 * 1000)).build(); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 711b0c1..4b360b3 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -36,7 +36,7 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -194,16 +194,15 @@ class IntegrationWithKafka { } synchronized String lastKey() { - return this.receivedKafkaOutput.key; + return new String(this.receivedKafkaOutput.key); } synchronized String lastValue() { - return this.receivedKafkaOutput.value; + return new String(this.receivedKafkaOutput.value); } void reset() { - count = 0; - this.receivedKafkaOutput = new TopicListener.DataFromTopic("", ""); + this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null); } } @@ -272,7 +271,9 @@ class IntegrationWithKafka { private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null; - Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null); + Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE) + .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build(); + String str = gson.toJson(param); return jsonObject(str); } @@ -296,9 +297,10 @@ class IntegrationWithKafka { } } - ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { + ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) { try { - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic); + Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) + .kafkaOutputTopic(topic).gzip(gzip).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -309,9 +311,13 @@ class IntegrationWithKafka { } } + ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { + return consumerJobInfoKafka(topic, filterData, false); + } + ConsumerJobInfo consumerJobInfoKafka(String topic) { try { - Job.Parameters param = new Job.Parameters(null, null, null, 1, topic); + Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -321,26 +327,27 @@ class IntegrationWithKafka { } } - private SenderOptions kafkaSenderOptions() { + private SenderOptions kafkaSenderOptions() { String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers(); Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return SenderOptions.create(props); } - private SenderRecord kafkaSenderRecord(String data, String key, String typeId) { + private SenderRecord kafkaSenderRecord(String data, String key, String typeId) { final InfoType infoType = this.types.get(typeId); int correlationMetadata = 2; - return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata); + return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()), + correlationMetadata); } - private void sendDataToKafka(Flux> dataToSend) { - final KafkaSender sender = KafkaSender.create(kafkaSenderOptions()); + private void sendDataToKafka(Flux> dataToSend) { + final KafkaSender sender = KafkaSender.create(kafkaSenderOptions()); sender.send(dataToSend) // .doOnError(e -> logger.error("Send failed", e)) // @@ -440,7 +447,7 @@ class IntegrationWithKafka { private void printStatistics() { String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; String stats = restClient().get(targetUri).block(); - logger.info("Stats : {}", stats); + logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000)); } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @@ -539,14 +546,15 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); - filterData.getMeasObjClass().add("UtranCell"); + filterData.getMeasTypes().add("pmAnrNcgiMeasFailUeCap"); + filterData.getMeasTypes().add("pmAnrNcgiMeasRcvDrx"); + filterData.getMeasObjInstIds().add("ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32"); final int NO_OF_JOBS = 150; ArrayList receivers = new ArrayList<>(); for (int i = 0; i < NO_OF_JOBS; ++i) { final String outputTopic = "manyJobs_" + i; - this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic, restClient()); KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext); receivers.add(receiver); @@ -555,14 +563,15 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 500; + final int NO_OF_OBJECTS = 1000; Instant startTime = Instant.now(); - final String FILE_NAME = "pm_report.json.gz"; + final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz"; DataStore fileStore = dataStore(); + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); fileStore.create(DataStore.Bucket.FILES).block(); fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); @@ -571,8 +580,8 @@ class IntegrationWithKafka { sendDataToKafka(dataToSend); while (receivers.get(0).count != NO_OF_OBJECTS) { - logger.info("sleeping {}", kafkaReceiver.count); - Thread.sleep(1000 * 1); + // logger.info("sleeping {}", kafkaReceiver.count); + Thread.sleep(100 * 1); } final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); @@ -584,7 +593,7 @@ class IntegrationWithKafka { } } - // printStatistics(); + printStatistics(); } private String newFileEvent(String fileName) { @@ -601,16 +610,16 @@ class IntegrationWithKafka { @Test void testHistoricalData() throws Exception { // test - waitForKafkaListener(); final String JOB_ID = "testHistoricalData"; DataStore fileStore = dataStore(); + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); fileStore.create(DataStore.Bucket.FILES).block(); fileStore.create(DataStore.Bucket.LOCKS).block(); fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), - "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block(); fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block(); @@ -624,7 +633,7 @@ class IntegrationWithKafka { restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1)); + await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive()); } @Test diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java index a18034e..e3619ac 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java @@ -27,12 +27,15 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.filter.Filter.FilteredData; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsltFilterTest { private String filterReport(JsltFilter filter) throws Exception { - return filter.filter(new DataFromTopic("", loadReport())).value; + DataFromTopic data = new DataFromTopic(null, loadReport().getBytes()); + FilteredData filtered = filter.filter(data); + return filtered.getValueAString(); } @Test diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java index ce00c4d..99d3591 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.filter.Filter.FilteredData; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsonPathFilterTest { @@ -35,7 +36,9 @@ class JsonPathFilterTest { void testJsonPath() throws Exception { String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); JsonPathFilter filter = new JsonPathFilter(exp); - String res = filter.filter(new DataFromTopic("", loadReport())).value; + DataFromTopic data = new DataFromTopic(null, loadReport().getBytes()); + FilteredData filtered = filter.filter(data); + String res = filtered.getValueAString(); assertThat(res).isEqualTo("\"attTCHSeizures\""); } diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index cbdf9e7..3e465da 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -27,12 +27,15 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.filter.Filter.FilteredData; import org.oran.dmaapadapter.tasks.TopicListener; class PmReportFilterTest { private String filterReport(PmReportFilter filter) throws Exception { - return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value; + TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes()); + FilteredData filtered = filter.filter(data); + return filtered.getValueAString(); } @Test @@ -120,11 +123,11 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value; - assertThat(filtered).isEmpty(); + FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes())); + assertThat(filtered.isEmpty()).isTrue(); - filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value; - assertThat(filtered).isEmpty(); + filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes())); + assertThat(filtered.isEmpty()).isTrue(); } diff --git a/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json similarity index 100% rename from src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json rename to src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json diff --git a/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz new file mode 100644 index 0000000..1ebd353 Binary files /dev/null and b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz differ