From 636f3d1636a0783700ca1e1272f836883b262758 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 17 Oct 2022 15:47:37 +0200 Subject: [PATCH] Added gzip of output data Bugfix historical data was broken, fixed. Change-Id: I7ba95f962676e69ebd35b3ff467ac47cc1786b2d Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 --- pom.xml | 6 +- .../java/org/oran/dmaapadapter/filter/Filter.java | 14 +++-- .../org/oran/dmaapadapter/filter/JsltFilter.java | 2 +- .../oran/dmaapadapter/filter/JsonPathFilter.java | 6 +- .../oran/dmaapadapter/filter/PmReportFilter.java | 4 +- .../org/oran/dmaapadapter/filter/RegexpFilter.java | 2 +- .../java/org/oran/dmaapadapter/repository/Job.java | 28 ++++----- .../dmaapadapter/tasks/DmaapTopicListener.java | 2 +- .../dmaapadapter/tasks/HttpJobDataDistributor.java | 8 ++- .../dmaapadapter/tasks/JobDataDistributor.java | 37 ++++++++---- .../tasks/KafkaJobDataDistributor.java | 32 +++++----- .../dmaapadapter/tasks/KafkaTopicListener.java | 23 ++++---- .../org/oran/dmaapadapter/tasks/TopicListener.java | 18 +++--- .../oran/dmaapadapter/tasks/TopicListeners.java | 13 +++-- src/main/resources/typeSchemaPmData.json | 3 + .../org/oran/dmaapadapter/ApplicationTest.java | 58 +++++------------- .../org/oran/dmaapadapter/IntegrationWithIcs.java | 8 +-- .../oran/dmaapadapter/IntegrationWithKafka.java | 65 ++++++++++++--------- .../oran/dmaapadapter/filter/JsltFilterTest.java | 5 +- .../dmaapadapter/filter/JsonPathFilterTest.java | 5 +- .../dmaapadapter/filter/PmReportFilterTest.java | 13 +++-- ... A20000626.2315+0200-2330+0200_HTTPS-6-73.json} | 0 ...20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz | Bin 0 -> 9832 bytes 23 files changed, 183 insertions(+), 169 deletions(-) rename src/test/resources/{A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json => A20000626.2315+0200-2330+0200_HTTPS-6-73.json} (100%) create mode 100644 src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz diff --git a/pom.xml b/pom.xml index fb3135b..b92e55b 100644 --- a/pom.xml +++ b/pom.xml @@ -197,7 +197,7 @@ io.projectreactor.kafka reactor-kafka - 1.3.12 + 1.3.13 com.google.guava @@ -207,12 +207,12 @@ software.amazon.awssdk s3 - 2.13.73 + 2.17.292 com.amazonaws aws-java-sdk - 1.11.795 + 1.12.321 diff --git a/src/main/java/org/oran/dmaapadapter/filter/Filter.java b/src/main/java/org/oran/dmaapadapter/filter/Filter.java index bfd358b..6ec74d5 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/Filter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/Filter.java @@ -32,19 +32,23 @@ public interface Filter { @ToString public static class FilteredData { - public final String key; - public final String value; - private static final FilteredData emptyData = new FilteredData("", ""); + public final byte[] key; + public final byte[] value; + private static final FilteredData emptyData = new FilteredData(null, null); public boolean isEmpty() { - return value.isEmpty() && key.isEmpty(); + return (key == null || key.length == 0) && (value == null || value.length == 0); } - public FilteredData(String key, String value) { + public FilteredData(byte[] key, byte[] value) { this.key = key; this.value = value; } + public String getValueAString() { + return value == null ? "" : new String(this.value); + } + public static FilteredData empty() { return emptyData; } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java index 9336617..577f83d 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsltFilter.java @@ -60,7 +60,7 @@ class JsltFilter implements Filter { if (filteredNode == NullNode.instance) { return FilteredData.empty(); } - return new FilteredData(data.key, mapper.writeValueAsString(filteredNode)); + return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode)); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java index 36a2103..cded9a0 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/JsonPathFilter.java @@ -43,8 +43,10 @@ class JsonPathFilter implements Filter { @Override public FilteredData filter(DataFromTopic data) { try { - Object o = JsonPath.parse(data.value).read(this.expression, Object.class); - return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o)); + String str = new String(data.value); + Object o = JsonPath.parse(str).read(this.expression, Object.class); + String json = gson.toJson(o); + return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes()); } catch (Exception e) { return FilteredData.empty(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index e33ae68..8f4976f 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -96,7 +96,7 @@ public class PmReportFilter implements Filter { if (!filter(report, this.filterData)) { return FilteredData.empty(); } - return new FilteredData(data.key, gson.toJson(report)); + return new FilteredData(data.key, gson.toJson(report).getBytes()); } catch (Exception e) { logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage()); return FilteredData.empty(); @@ -107,7 +107,7 @@ public class PmReportFilter implements Filter { private PmReport createPmReport(DataFromTopic data) { synchronized (data) { if (data.getCachedPmReport() == null) { - data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class)); + data.setCachedPmReport(gsonParse.fromJson(data.valueAsString(), PmReport.class)); } return data.getCachedPmReport(); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java index b1000ed..4806604 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/RegexpFilter.java @@ -44,7 +44,7 @@ class RegexpFilter implements Filter { if (regexp == null) { return new FilteredData(data.key, data.value); } - Matcher matcher = regexp.matcher(data.value); + Matcher matcher = regexp.matcher(data.valueAsString()); boolean match = matcher.find(); if (match) { return new FilteredData(data.key, data.value); diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 9fd8e57..b5b7e52 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -84,19 +84,20 @@ public class Job { @Builder.Default int noOfSentBytes = 0; - public void received(String str) { - noOfReceivedBytes += str.length(); + public void received(byte[] bytes) { + noOfReceivedBytes += bytes.length; noOfReceivedObjects += 1; } - public void filtered(String str) { - noOfSentBytes += str.length(); + public void filtered(byte[] bytes) { + noOfSentBytes += bytes.length; noOfSentObjects += 1; } } + @Builder public static class Parameters { public static final String REGEXP_TYPE = "regexp"; public static final String PM_FILTER_TYPE = "pmdata"; @@ -104,9 +105,12 @@ public class Job { public static final String JSON_PATH_FILTER_TYPE = "json-path"; @Setter + @Builder.Default private String filterType = REGEXP_TYPE; + @Getter private Object filter; + @Getter private BufferTimeout bufferTimeout; @@ -115,19 +119,15 @@ public class Job { @Getter private String kafkaOutputTopic; - public Parameters() {} + @Getter + private Boolean gzip; - public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency, - String kafkaOutputTopic) { - this.filter = filter; - this.bufferTimeout = bufferTimeout; - this.maxConcurrency = maxConcurrency; - this.filterType = filterType; - this.kafkaOutputTopic = kafkaOutputTopic; + public int getMaxConcurrency() { + return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency; } - public int getMaxConcurrency() { - return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency; + public boolean isGzip() { + return gzip != null && gzip; } public Filter.Type getFilterType() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 4799034..cd4c002 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -72,7 +72,7 @@ public class DmaapTopicListener implements TopicListener { .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // .doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) // .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // - .map(input -> new DataFromTopic("", input)) + .map(input -> new DataFromTopic(null, input.getBytes())) .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) .publish() // .autoConnect(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index f57c12d..ef10d3a 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -26,6 +26,8 @@ import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -36,8 +38,8 @@ import reactor.core.publisher.Mono; public class HttpJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpJobDataDistributor(Job job, ApplicationConfig config) { - super(job, config); + public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux input) { + super(job, config, input); } @Override @@ -45,7 +47,7 @@ public class HttpJobDataDistributor extends JobDataDistributor { Job job = this.getJob(); logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; - return job.getConsumerRestClient().post("", output.value, contentType); + return job.getConsumerRestClient().post("", output.getValueAString(), contentType); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 05fbbc6..45a232d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -20,9 +20,12 @@ package org.oran.dmaapadapter.tasks; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; +import java.util.zip.GZIPOutputStream; import lombok.Getter; @@ -84,16 +87,12 @@ public abstract class JobDataDistributor { } } - protected JobDataDistributor(Job job, ApplicationConfig applConfig) { + protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux input) { this.job = job; this.applConfig = applConfig; this.dataStore = DataStore.create(applConfig); this.dataStore.create(DataStore.Bucket.FILES).subscribe(); this.dataStore.create(DataStore.Bucket.LOCKS).subscribe(); - } - - public synchronized void start(Flux input) { - collectHistoricalData(); this.errorStats.resetIrrecoverableErrors(); this.subscription = filterAndBuffer(input, this.job) // @@ -110,7 +109,7 @@ public abstract class JobDataDistributor { } } - private void collectHistoricalData() { + public void collectHistoricalData() { PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { @@ -129,17 +128,34 @@ public abstract class JobDataDistributor { .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(), dataStore), 100) .map(job::filter) // + .map(this::gzip) // .flatMap(this::sendToClient, 1) // .onErrorResume(this::handleCollectHistoricalDataError) // .subscribe(); } } + private Filter.FilteredData gzip(Filter.FilteredData data) { + if (job.getParameters().isGzip()) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(out)) { + gzip.write(data.value); + return new Filter.FilteredData(data.key, out.toByteArray()); + } catch (IOException e) { + logger.error("Unexpected exception when zipping: {}", e.getMessage()); + return data; + } + } else { + return data; + } + } + private Mono handleCollectHistoricalDataError(Throwable t) { if (t instanceof LockedException) { logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId()); return Mono.empty(); // Ignore } else { + logger.error("Exception: {} job: {}", t.getMessage(), job.getId()); return tryDeleteLockFile() // .map(bool -> "OK"); } @@ -153,11 +169,11 @@ public abstract class JobDataDistributor { NewFileEvent ev = new NewFileEvent(fileName); - return new TopicListener.DataFromTopic("", gson.toJson(ev)); + return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes()); } private boolean filterStartTime(String startTimeStr, String fileName) { - // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json + // A20000626.2315+0200-2330+0200_HTTPS-6-73.json try { if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) { @@ -212,15 +228,16 @@ public abstract class JobDataDistributor { Flux filtered = // inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) // .map(job::filter) // + .map(this::gzip) // .filter(f -> !f.isEmpty()) // .doOnNext(f -> job.getStatistics().filtered(f.value)); // if (job.isBuffered()) { - filtered = filtered.map(input -> quoteNonJson(input.value, job)) // + filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(buffered -> new Filter.FilteredData("", buffered.toString())); + .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes())); } return filtered; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 4a68603..5e09714 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -23,9 +23,10 @@ package org.oran.dmaapadapter.tasks; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.repository.Job; @@ -46,20 +47,23 @@ import reactor.kafka.sender.SenderRecord; public class KafkaJobDataDistributor extends JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class); - private KafkaSender sender; + private KafkaSender sender; private final ApplicationConfig appConfig; - public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { - super(job, appConfig); + public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux input) { + super(job, appConfig, input); this.appConfig = appConfig; + SenderOptions senderOptions = senderOptions(appConfig); + this.sender = KafkaSender.create(senderOptions); } @Override protected Mono sendToClient(Filter.FilteredData data) { Job job = this.getJob(); - SenderRecord senderRecord = senderRecord(data, job); + SenderRecord senderRecord = senderRecord(data, job); - logger.trace("Sending data '{}' to Kafka topic: {}", data, job.getParameters().getKafkaOutputTopic()); + logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10), + job.getParameters().getKafkaOutputTopic()); return this.sender.send(Mono.just(senderRecord)) // .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) // @@ -67,14 +71,8 @@ public class KafkaJobDataDistributor extends JobDataDistributor { t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) // .onErrorResume(t -> Mono.empty()) // .collectList() // - .map(x -> data.value); - } + .map(x -> "ok"); - @Override - public synchronized void start(Flux input) { - super.start(input); - SenderOptions senderOptions = senderOptions(appConfig); - this.sender = KafkaSender.create(senderOptions); } @Override @@ -86,18 +84,18 @@ public class KafkaJobDataDistributor extends JobDataDistributor { } } - private static SenderOptions senderOptions(ApplicationConfig config) { + private static SenderOptions senderOptions(ApplicationConfig config) { String bootstrapServers = config.getKafkaBootStrapServers(); Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return SenderOptions.create(props); } - private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { + private SenderRecord senderRecord(Filter.FilteredData output, Job infoJob) { int correlationMetadata = 2; String topic = infoJob.getParameters().getKafkaOutputTopic(); return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index b238b6e..960ecef 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.zip.GZIPInputStream; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.datastore.DataStore; import org.oran.dmaapadapter.repository.InfoType; @@ -40,7 +40,6 @@ import reactor.core.publisher.Mono; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; - /** * The class streams incoming requests from a Kafka topic and sends them further * to a multi cast sink, which several other streams can connect to. @@ -74,42 +73,40 @@ public class KafkaTopicListener implements TopicListener { return KafkaReceiver.create(kafkaInputProperties(clientId)) // .receiveAutoAck() // .concatMap(consumerRecord -> consumerRecord) // - .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), - input.value())) // + .doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) // .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) // .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // - .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) // + .filter(t -> t.value().length > 0 || t.key().length > 0) // .map(input -> new DataFromTopic(input.key(), input.value())) // - .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) // - .publish() // + .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() // .autoConnect(1); } - private ReceiverOptions kafkaInputProperties(String clientId) { + private ReceiverOptions kafkaInputProperties(String clientId) { Map consumerProps = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId()); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords()); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); } public static Mono getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type, DataStore fileStore) { - if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) { + if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) { return Mono.just(data); } try { - NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class); + NewFileEvent ev = gson.fromJson(data.valueAsString(), NewFileEvent.class); if (ev.getFilename() == null) { logger.warn("Ignoring received message: {}", data); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 3f06457..be230cf 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,7 +20,6 @@ package org.oran.dmaapadapter.tasks; -import java.nio.charset.StandardCharsets; import lombok.Getter; import lombok.Setter; @@ -33,22 +32,23 @@ public interface TopicListener { @ToString public static class DataFromTopic { - public final String key; - public final String value; + public final byte[] key; + public final byte[] value; + + private static byte[] noBytes = new byte[0]; @Getter @Setter @ToString.Exclude private PmReport cachedPmReport; - public DataFromTopic(String key, String value) { - this.key = key; - this.value = value; + public DataFromTopic(byte[] key, byte[] value) { + this.key = key == null ? noBytes : key; + this.value = value == null ? noBytes : value; } - public DataFromTopic(String key, byte[] value) { - this.key = key; - this.value = new String(value, StandardCharsets.UTF_8); + public String valueAsString() { + return new String(this.value); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index d5868e5..4d76cd1 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -93,16 +93,19 @@ public class TopicListeners { } } - private JobDataDistributor createConsumer(Job job) { - return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) - : new HttpJobDataDistributor(job, appConfig); + private JobDataDistributor createConsumer(Job job, TopicListener topicListener) { + return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) + ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux()) + : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux()); } private void addConsumer(Job job, MultiMap distributors, Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); - JobDataDistributor distributor = createConsumer(job); - distributor.start(topicListener.getFlux()); + JobDataDistributor distributor = createConsumer(job, topicListener); + + distributor.collectHistoricalData(); + distributors.put(job.getType().getId(), job.getId(), distributor); } diff --git a/src/main/resources/typeSchemaPmData.json b/src/main/resources/typeSchemaPmData.json index 7d5ab62..5b4ab89 100644 --- a/src/main/resources/typeSchemaPmData.json +++ b/src/main/resources/typeSchemaPmData.json @@ -75,6 +75,9 @@ "kafkaOutputTopic": { "type": "string" }, + "gzip": { + "type": "boolean" + }, "bufferTimeout": { "type": "object", "additionalProperties": false, diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 5d20541..139c1bb 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -63,10 +63,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.JobDataDistributor; import org.oran.dmaapadapter.tasks.NewFileEvent; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; -import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +82,6 @@ import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -391,38 +388,6 @@ class ApplicationTest { "Could not find type"); } - @Test - void testReceiveAndPostDataFromKafka() throws Exception { - final String JOB_ID = "ID"; - final String TYPE_ID = "PmDataOverKafka"; - waitForRegistration(); - - // Create a job - Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null); - String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, ""); - - this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID); - - // Handle received data from Kafka, check that it has been posted to the - // consumer - kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data"))); - - ConsumerController.TestResults consumer = this.consumerController.testResults; - await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]"); - assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); - - // This only works in debugger. Removed for now. - assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull(); - assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId) - .isEqualTo("xml-file-data-to-filestore"); - - } - @Test void testReceiveAndPostDataFromDmaapBuffering() throws Exception { final String JOB_ID = "testReceiveAndPostDataFromDmaap"; @@ -431,7 +396,8 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null); + Job.Parameters param = Job.Parameters.builder().bufferTimeout(new Job.BufferTimeout(123, 456)).build(); + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -457,7 +423,7 @@ class ApplicationTest { waitForRegistration(); // Create a job - Job.Parameters param = new Job.Parameters(null, null, null, 1, null); + Job.Parameters param = Job.Parameters.builder().build(); ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -500,8 +466,8 @@ class ApplicationTest { filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, - new Job.BufferTimeout(123, 456), null, null); + Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); @@ -540,14 +506,15 @@ class ApplicationTest { DataStore fileStore = DataStore.create(this.applicationConfig); fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), - "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block(); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getSourceNames().add("O-DU-1122"); filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, - new Job.BufferTimeout(123, 456), null, null); + Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); + String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); @@ -569,7 +536,8 @@ class ApplicationTest { // Create a job with a PM filter String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" // + "."; - Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null); + Job.Parameters param = + Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build(); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson)); @@ -669,7 +637,9 @@ class ApplicationTest { // Create a job with a PM filter PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); filterData.getMeasTypes().add("succImmediateAssignProcs"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null); + Job.Parameters param = + Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build(); + String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson)); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 4ec51ff..5f7a886 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -230,8 +230,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = - new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null); + Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 456)).build(); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); @@ -250,8 +250,8 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; - Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, - new Job.BufferTimeout(123, 170 * 1000), 1, null); + Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE) + .bufferTimeout(new Job.BufferTimeout(123, 170 * 1000)).build(); ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 711b0c1..4b360b3 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -36,7 +36,7 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -194,16 +194,15 @@ class IntegrationWithKafka { } synchronized String lastKey() { - return this.receivedKafkaOutput.key; + return new String(this.receivedKafkaOutput.key); } synchronized String lastValue() { - return this.receivedKafkaOutput.value; + return new String(this.receivedKafkaOutput.value); } void reset() { - count = 0; - this.receivedKafkaOutput = new TopicListener.DataFromTopic("", ""); + this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null); } } @@ -272,7 +271,9 @@ class IntegrationWithKafka { private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null; - Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null); + Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE) + .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build(); + String str = gson.toJson(param); return jsonObject(str); } @@ -296,9 +297,10 @@ class IntegrationWithKafka { } } - ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { + ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) { try { - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic); + Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE) + .kafkaOutputTopic(topic).gzip(gzip).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -309,9 +311,13 @@ class IntegrationWithKafka { } } + ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { + return consumerJobInfoKafka(topic, filterData, false); + } + ConsumerJobInfo consumerJobInfoKafka(String topic) { try { - Job.Parameters param = new Job.Parameters(null, null, null, 1, topic); + Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build(); String str = gson.toJson(param); Object parametersObj = jsonObject(str); @@ -321,26 +327,27 @@ class IntegrationWithKafka { } } - private SenderOptions kafkaSenderOptions() { + private SenderOptions kafkaSenderOptions() { String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers(); Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return SenderOptions.create(props); } - private SenderRecord kafkaSenderRecord(String data, String key, String typeId) { + private SenderRecord kafkaSenderRecord(String data, String key, String typeId) { final InfoType infoType = this.types.get(typeId); int correlationMetadata = 2; - return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata); + return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()), + correlationMetadata); } - private void sendDataToKafka(Flux> dataToSend) { - final KafkaSender sender = KafkaSender.create(kafkaSenderOptions()); + private void sendDataToKafka(Flux> dataToSend) { + final KafkaSender sender = KafkaSender.create(kafkaSenderOptions()); sender.send(dataToSend) // .doOnError(e -> logger.error("Send failed", e)) // @@ -440,7 +447,7 @@ class IntegrationWithKafka { private void printStatistics() { String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL; String stats = restClient().get(targetUri).block(); - logger.info("Stats : {}", stats); + logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000)); } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @@ -539,14 +546,15 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); - filterData.getMeasObjClass().add("UtranCell"); + filterData.getMeasTypes().add("pmAnrNcgiMeasFailUeCap"); + filterData.getMeasTypes().add("pmAnrNcgiMeasRcvDrx"); + filterData.getMeasObjInstIds().add("ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32"); final int NO_OF_JOBS = 150; ArrayList receivers = new ArrayList<>(); for (int i = 0; i < NO_OF_JOBS; ++i) { final String outputTopic = "manyJobs_" + i; - this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, + this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic, restClient()); KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext); receivers.add(receiver); @@ -555,14 +563,15 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 500; + final int NO_OF_OBJECTS = 1000; Instant startTime = Instant.now(); - final String FILE_NAME = "pm_report.json.gz"; + final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz"; DataStore fileStore = dataStore(); + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); fileStore.create(DataStore.Bucket.FILES).block(); fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block(); @@ -571,8 +580,8 @@ class IntegrationWithKafka { sendDataToKafka(dataToSend); while (receivers.get(0).count != NO_OF_OBJECTS) { - logger.info("sleeping {}", kafkaReceiver.count); - Thread.sleep(1000 * 1); + // logger.info("sleeping {}", kafkaReceiver.count); + Thread.sleep(100 * 1); } final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); @@ -584,7 +593,7 @@ class IntegrationWithKafka { } } - // printStatistics(); + printStatistics(); } private String newFileEvent(String fileName) { @@ -601,16 +610,16 @@ class IntegrationWithKafka { @Test void testHistoricalData() throws Exception { // test - waitForKafkaListener(); final String JOB_ID = "testHistoricalData"; DataStore fileStore = dataStore(); + fileStore.deleteBucket(DataStore.Bucket.FILES).block(); fileStore.create(DataStore.Bucket.FILES).block(); fileStore.create(DataStore.Bucket.LOCKS).block(); fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), - "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); + "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block(); fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block(); @@ -624,7 +633,7 @@ class IntegrationWithKafka { restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1)); + await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive()); } @Test diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java index a18034e..e3619ac 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsltFilterTest.java @@ -27,12 +27,15 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.filter.Filter.FilteredData; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsltFilterTest { private String filterReport(JsltFilter filter) throws Exception { - return filter.filter(new DataFromTopic("", loadReport())).value; + DataFromTopic data = new DataFromTopic(null, loadReport().getBytes()); + FilteredData filtered = filter.filter(data); + return filtered.getValueAString(); } @Test diff --git a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java index ce00c4d..99d3591 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/JsonPathFilterTest.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.filter.Filter.FilteredData; import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic; class JsonPathFilterTest { @@ -35,7 +36,9 @@ class JsonPathFilterTest { void testJsonPath() throws Exception { String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]"); JsonPathFilter filter = new JsonPathFilter(exp); - String res = filter.filter(new DataFromTopic("", loadReport())).value; + DataFromTopic data = new DataFromTopic(null, loadReport().getBytes()); + FilteredData filtered = filter.filter(data); + String res = filtered.getValueAString(); assertThat(res).isEqualTo("\"attTCHSeizures\""); } diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index cbdf9e7..3e465da 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -27,12 +27,15 @@ import java.nio.file.Files; import java.nio.file.Path; import org.junit.jupiter.api.Test; +import org.oran.dmaapadapter.filter.Filter.FilteredData; import org.oran.dmaapadapter.tasks.TopicListener; class PmReportFilterTest { private String filterReport(PmReportFilter filter) throws Exception { - return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value; + TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes()); + FilteredData filtered = filter.filter(data); + return filtered.getValueAString(); } @Test @@ -120,11 +123,11 @@ class PmReportFilterTest { PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); PmReportFilter filter = new PmReportFilter(filterData); - String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value; - assertThat(filtered).isEmpty(); + FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes())); + assertThat(filtered.isEmpty()).isTrue(); - filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value; - assertThat(filtered).isEmpty(); + filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes())); + assertThat(filtered.isEmpty()).isTrue(); } diff --git a/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json similarity index 100% rename from src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json rename to src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json diff --git a/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz b/src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..1ebd3530d664f053752f39ab5378964f6dd5bc57 GIT binary patch literal 9832 zcmZ9PWmFwducmQ#EfjZmE$;5_?yiM{yK8ZG4(<-cDelGHIk-Da@3-#E+?l<0lIP9N z%HL!Y$H2k08{wLPL5i@k{5xE1Tuf~2teo^L|0E+DJ3GsN$VggUU0IEhi;vx`AZAtr86^(LQW9o1U3G`9iQ2ABaPFkAtnNxaUtH&y`7f!56WYJQF0@S zEagjNuHt;??8%ANFh-1&MuW6YfYFZBxfE`|yPpYc9_}-_r%%Wn1||#>Br5OcFflG} zI{a~)g)8PBG3%p)?@dHwseEEYTQdn2*VfkdrdW3$e9(8Pe!dSk9 z>4DdCFC_7HjV5W?xRLfGhg&8u*{`*K9UW^g|R*a7kR|32+T(JacR)*b-Ns zbo`d}(Rpb7-nlSeoPV)2RQO(vKu+uiy{ZeU+wU5ES^IklUoU)++5w+_=<$ecxC=M9 z53V3TN|EjtwoK&uGnDGy--@*`OUC26k=S{?_Rblphd=dpalhAH{fg@G{C0@XTN?&z zJ@}(jI%73K>=vouSD!l$h$4eLTcC52m3kTwC)2>-x%3o+=lM2q7^*&t_t<(8_?Xn| zy7WPVaT!q?)vbUR-ILVY9HCCDN|_0iWieadf)KM$&r2c0TS#P(bgyQnk;}J-ifmcs z7SdIcL4$|VZ10;?H+%-aNnC%@^y8~64+^dOENVq7h7Xk?s|tkvz9w89Cq5+po`*h; zY&g^yF5TxE69w$!e{>427#kT8V%fq%jKnHbhCsbc>K$Q!zj6|EFb)Q$X(7JiJ`Qf? z>`yrarg8e3Yy`-&&d~O8{bcQUG7E76}DtY zfQ2o0Nt~+E^oA(0qvZLK>L4}t zQJb3z2@by9D;5e`9Kd?}^P3krL27=bF2ltQ#~(? zXAW!KIIdmWB-x3Icf442IcpmB+u90R9npJuJSdJxal0JVj> z{T61%IOqkJ_CnUvmY5~=C4>&rEy)mfs1Zl3TPsIvGzD*O1)A?M!Ms2bV?+oKI1Z9} zBS%1JpxYtbqoVg>;@4IQ{@;qs(wBa}zEqzacch_++W{9}>O4T8vy*`2Xo z7e{bn@;5q)wnyEunS9F(1Y`Wb7BG2O7RbIeuqE_*kv>IGgR*_3 z!9AOMc7@sfy8xWsXqxsjOcJqJo0Aam;%YQx?p# zt;oV3kwAwR;Z`e^rq4Jk8sQ(NV~_UHadR|QuO(UZlTTC<>)`PGM~M6r-*q%C%yq6u z8$nWO8tjeL;s2wC!P$JXM$<7cEdb_nma_>LpJd>Km!895?ePPf{XM=nk4fz9bGj9s zFA;x$fpru^4d<8ho>uVt364atK(0Y1v*Ccn_CBQ*mNmP&T%%}rPU4*H;R?#*MLI%~ zR-*Tqe?SjaTS9Foteg*bLTk3f_S8JIHZc0*aDU|n;?YpCr8HXKa`M@6PL6#4H$U=B z%o7^sa@4M$Ri_At`zqXin+K9F(&NRbx44SHGJ#L_@0iFxt0qi#%Y)TDM3$Ta)lu%f z4f==+ePm+RNRov3IvYhSrbTw}Vag?<6NVU-d5HUK{VD4W*{6G+f)@Yh60O*=I_1(JfyKB@f$QE&9wc^0mT4-=_-Odeo}x$ zwuw9%D$(6dBHFWapmHw?w;5{RUD{+K*)~>9ooT@7UTK&~3OpLk{fMM_R5~roAeSdm z+Y@`UdoX=0{}H}fFl`c{SODHo=qyjB3jPb#bH%usATPSZ?|T7wFsme<@JKiDC7vuO zm9Hd0IK~YKT>55sO*tpUF3M*8AU1so($KFvmsIIXsY%D{TVy18BzeofjyQ?VDU|#R zVUnT09jLtPoX4TiF|J3>)`gxYu}{^oHoHW=;iS{R2jM-S6ZSCMFNL;*o&~N98y+3- z+NpZjM09xL61*w!a z7t6N9)0GmdR;mIL{Axlh#y^-lz9^r14?dS8R=<8M4)`VdvDIYf7oHI9eD1_Gk;(j#FGZgFX%l7BwNB&5>350BhMVTey5%$>u zy9oM^Fq|?Wdge%gAU;;vx#LIhEszRQx~FkIDekiZVDSJ|c7T#WE!BKYPK6C^Z70&JD12=gHu)cHz zHxL?E9D3-#BXCYu6nKb)qB$}3xG{oVK`b111&6xsVL{~tth%% z7dq%41%J1Hi|&3dBsfP^`p@n;85K&^Ape99J5KOllulZ%^FAE3{JAE5K{x5h0nbYl zM^R_=HXboW&#yo=v_c-J6-PKRmE#g|HH!CijmzscmFc;XJg5VMt;h~^&Jzj4M5fEJ zQQlP#K{&=&=UFXCYeN3hqo?u<_yty{dCq5UQS=+Fo>)Vc3beq=i1`3d_do2Fo&pr7 zS)Plo_^vqNZ#ypT2bmzhcnp+ZOgG5{rboSs8Z%=);IgS~Gssba4j<0+U+R;h0Ny

L@^mL9gTpWBh&%DU%Cgz{v&b|7j<0GYkJ3&QDPIu_yc|T%7ZYCI1GS z!1}=X9HV`;ZR^d3@&iw+co8Md%t~Lbb2}V7Dv(!cX_p_Lu)Crs=3=8o9blt1k{O>E zypzI8mU%Iy%}j|<&P-AI|7^RkSbm)QdbGqhe!RpNFHOcfGiAmUkeQM$^MbF#M$24h zbUYpr_pk3+O|9?BXbC@DKvv0nA@L)710l3|Ae;*e@6MaQmeL1{`;z~S87H|DryrTt zs6U)$)ozT?;BNPGKXj0P$ms3TO)GIYhV)OtkFGLp0m?V5s7Oh#1!by7;p=H;U#Q@K zxWZdj=PIXFi5g{hB7E?FbXqpEC62u>(I9m(Ode-UaV6HSj=@u-U!>`D9Zz3{>G!pYm6LV7_hwYYi513Sy$ z6?}4+R4vb25MIg}tFdg%OHyjYCsGKe{7v_bu8`2^1`~2m`-dqv^k^cXiA8)pegXDM z-6%NM9!*_iahqXRU~r;G&H71+|Pj zct2v7QyhU;ao@@bz86z5w_g)cja`vBG{ja#(h(5YNq6GYztnn$RyOur^Uj&bj>kHV z$FeDKtX4jcq%x{m`hUljE1Bc3KYm1vuKh&~eM)b#tvp+0=TZ>xt4Ud!_V8VcFq9)s zWq(JD$}c0*GG3z7Hs7ovpQ0dg3`| zsYd}h3T{lmn#ULeKpxUK#Vn#R{#bZI1mJ8H2{gtpquOg0_Iz}z6yMSD(0o!&$D~)t z*v}tv{e}4t9(y!)E656+q*yMhdV!Sc2nsxu!`j1TR8IRo2X(9t@RtlzExp#pB*3&F zPW`4xMkyJXnyj#e=mZzcFc1{x88n-0srEb;p5-3RmbUyL+mP*K!+-r@zFqEFV>LNf z#=1`RI@*!$LBaFNtq=H#eJpsbqeW1SMtoTeI;|6rsE)%##{$KQ;)RNA6$WP-tze7kxyNw^Mm9b8rPoUEy_|@eJ^9c>{W8O+U zHEtBnka_toC#f6HnDAsI>1c5HT{dBK-d%iiW;nb~$mTJnUnOupN;I1uG^RRChG1=P z$XH!**P%#8zQ4RFFno>#$l`l(&a?*Af%L~fYQ0ohJ>gPD;JjwT{4o#iT^@m}TnK;T zTDU&jNut{lxj)*)$Lj^;*%!jqCk*WH$%ZRYLs-Nrmr1-ic`uj)wO#zAOe}q@rbL!T zs-;GWP7z0KtX-!FmL>`fN1iT?NyW`K@;4j_U%45=WUIU3wqB~)&{%VOyTe#xN zqpo4u`uew(3};Q=ub3VLdslyMS+B&IDZRvD9qyCqNA+qb$ipp$JlK-O@_@t=Zt5bi zzT`ssc4VUofnj#3T(lo`!-oR|KZB4lR!&ti=J6ACA!>fe<4ewvnaHx-;xM5|WaQ%) zyz~C(-9G5i6>k0DBKPzMUD)OI^`!4pr3R(8W+=`vdKDI zK8`;xNmH>1Ig#fH?99r{5aqm=T%HwS;=kYq$XvFUfD;rtkAbYD37S*VmXY#Rt7``3#(cm991rfyrFBdnU~@f8Jm9?4A1UDg;-ig9DDfx>4zn-4 zK}n`|;Q5)nd?6M9Eqig#_Kv+B1fXUPkhb#nw6X!xJ%llWmK4t!{%Za?v61Na@scb4 zJ4W>I!znXcdQPu3wp(ysPuA(Bb*pjMkMgbDIY8$KyW7I>BLA52E|VEBQSCUd_d6XI z!RVM_W|xZnS{rt?>gD2yjH#`tiU`e+mZ(vlzi0iIBD2)Y;ic=n6LWFB zx%b|`le?{E&*rSU=P1J~K4F~2ZiE@-?GP+KUyau-R{6S~Iu-*#)yO;i_S%T^d*?fS z)GnG7cjWuG^#z(Nq^o2XvU#c?BdIcl`1G9ephv46$@a->CF;)Y@vC)IeSb)S8i=7} zfr#6SA|)Nn3sbfylReo|DeZ5P-WRYfACTVI>g>YpHDV_uD~hdT&wiMKkvl8^7$QlP z6BDd|VXkrm9-_sVg(7n|&>BVy+r;`M>O4jIW($I|sWr9N(9P-T>o+*B6 z0C2=dD0BoHlHJZ7SVYi=y-at-!FLi?R-JPXYlQa zYezt^l`eXxdCj%G2J>h99zC5uej5bX*)I=H_D|wTJY_udWWksdi|tf9!fd~v*8oo; z1N0b0{xpv$LiaS0>$kVaBK>rMk|}1Ph#v^%2QVYNA0@!SOS=yGcXM?3kavko?Ho>; zD7;tzXj?C$_JzeRJe&3>B~VM-S{$pK5xO#3)qUPz!zv4(bd`;PkFl+V<|Rc!pk|I| zN^KTV{)I(4&w+#P+*wd_3eh6GZrkPs-j7B2P7|QdPf_M8>D`O;+Iib5Wz>`AWq$xF z%YNEp-(PQ^z&KTBnW1UMw!1mta_oGrK;ID_53(*mN$r+)r6$wLhcRV(6aM4QQ_>y~7a>+1=-HgB#w`YUr>;U>0 zsRKEWEszTkDxI>Kjjqd6i`nCh>Ir~Nv%Ws$6ltz>|FKh|{$nQ)D5uu#`dpsXP1e4I z%%jaIyzXi#V2JK|hQC4z{d{q6?En0CX99NP={wxeVoCG^BK?U^T^cxedt82Q-=lY<-se3OrJfn|PM zhV)gBQ2>RH?jB9e3$=S)*8S_T`%GUpzgode!4IVZC70Cq1f{Z>onNPSct;6|sorjW zD0(~@Rmg9`*$JCHIH-DCtL)pf2Xf(z909*wm1)S}7rvJrR&ebb<8?hx+_Np8Kg(x4 zeq9*ox$W`be9~9=S(Y;x)ql*Q!c*if!eoJOuruq?n(^DT_FKlubRI9)S@`~X9|Jc# zNqpE*R|v?bI3tjelH#72EYHZ`-K~W}6y`eB!cesQ<#rPu7g;VnB)nJAD9x|HYw~Ms z_)Uww)at-3TK?1q;s(jJa()6fxFeBoxeHw;pks_aHdC-U@SqqGu9C(E$coY3Eh!$S z!s}z;Ag!9)2d`^VUU;_FLGPfJEZZ)xP*`$&Dg+=dQE1HlR1ga*tE`_D&;1;$q}H-V zj`1lU-ArEAZ9eCgf}&(m@?OEiyrApkoEo=K4;6 zC~+ZN$3VEj1(Y=CZfUhOk3~=_NIc*o-*3DDhihl3DofCgDO9=RxyKv*YdYx){K82J z3;YSpi(X6Er|KTL`B|0Q@*XN-tcvQ(FDi4C-=zJOm5nxoT>`7Kj`2d5>JDtR)YVe( z*S~}|jsgIP5rmWdI)NvLTR&$49wdW^UkSKc@|!Rr{!5Y!^OQbTChsorU6u9C?Jq&yOIXTiNhZ}sk)rz;zi~q6))Y50 zo0JUp)M&4j-;m9`*g8@V8|5>-?FMQwu1CIPlmD|F#@cSU1FD4k`H#!J;X~)LPIhPg zI8DSf@9Wy*JCeE^Uc>4+UO)mQ>PJH_vH?mij2`pvd`bs9-eIn)k!+cq?41R<>-$5% ztVv415C}&F85&tA8_m#yafVao5hIrH*2hV|r2g`lF zm}fC>A1};o4#~yM8Ga${)ro@%K#!B&72id+NaAn8waB_iy69vc%*$>%M|{Z?>7!-y zZywzyI~(MDc(-6%@m_%q!u>q4Cl!@lIAGOjR&+x1M6#zoC1+`E?Qw+6s$EOmJ5|>EP!UtazL~lW5I@gnTHvgR&O;O-W_42eSXxP6v-4WGxhUT-Y;N`l2>NykHRE7P zi6H}$e1Ji*zHRVR;FN7#lK`Y>hyh0jkqi$69a`xDehuIGze;vnMXE1*xQOZ-e(F`> zovgXER$U}3t`6CEZv$}63A?i5w5fjJXV@>xSfE8ijkKH8J9rAu+64G~+A?5LK;y^V zPpk9MdTlvy+#o23%)Z{S!5HeRB4hhwi>Rb1zWw>CxFSKt{fKy#P`GTKp%kLt_F9Yt zr|`-zi|TbN1m!+<9Kh=0mo5mVzvuYo+pwp>bn0fA4nWsq>RR4TbcL%4UP=70wpmns z)4z4QHG_`R#>ac*C@bpAMG9ZF$(Qrj&6kW)xMZFY`{m8I`}tt|eL>Olvypg%`nHPQ zgSQu7PuFt-+-IBK#meVb%h_~x{t@Fr$!Dp_(8-I`KbKCm_ti01mB1) zaFBo$^L1NL(>D6@pRfg^$Mc64Ovj5&Y-ifV1PEv(Q-=+20Im)C1QVG>-v!+t$$q46 z^9a|gC}DiU;>mT)SU(?xBd2n>n%Xg;n#Oib8GXb^J5m=$l3M8Dmc(`(tYwHjU1|Lo z+@Bmjmx`}$V8T9cx!y-YaG<4HF*Sacv4-oC1>0)S4_NM`d%dUDcgD7C9{G1Nbne5ByesdUsR8$e)3RtYdu};_ zPG$)o1FAd2kbRu^GY5TnoKEPp3?@y-P=5AY)^zo?3)gz+U|o`vzTUeJXF`TR}Pmd$|si_Q5jp2#LI%nd+O5t|qPoEUo9SpLDt>cZ6nzeGat6q<-Y&`q@Rhmwk@k}N^{tty~4*TtZhXJ61k&{?J+e{glWUAoc z<;OblrN=kHa7SO$eZ@W~u0OLemC#A9(Q%vz8!bB;NE-2!0KAFPo7aB(R|f?J-GG;_ zrpN|8r%U+_=dL`S860fH0uD+X7}A)22p`Wb_zNAZ6M&%7BSY%!nQ>gy)=^z%Mn!B| z)bOvTPu#0jBZ@SAc|^4tQ8=TZ$O!kGp^(z2p|+$2^P=A(eH3^Fc>%mb$2w@p%H4S8 z43o35DTY!p=qb7|>8^RKl+em$)j?Q{R2-h^r>ogCEB8M4tUczt>&WMqTAH$4O$sst=91il4aW_|2yY1_fX1 z5@X$_$h~iGgl_FMyk*JXczJ0AuR2!SKjzA)QTGH^$9yoFN$}WDh&v|0ikDg=g)V>q{Ihub9K+(`LgDwd;?bz@l!lsVimH#MR~L1wNog;K5^ z&iY#;8WZ@RukpNO03*r0RK|lc79UGJQFoF~BcmCec6}2^LB8HPDDU2!^jore$S~B*k35!P3aRl=G62ug2#MZV$|l1UQB-* zF|y1%L(5&sM6r%N!*1SMNNSqf(MbU^x}z4NDf>Yre-5M7`UkO~#+lUf*@CbqGz-~a zHRRx{^~Q}4sba4sw6R&qC%r`a-5(d9OeeBeIw4Z-nbX!0140j}`t~zDe?oa$$?Es} zjq!knACKs>-@ip@s25p;zjybOAiT9}Gm8G)+FqORx{?x?lt~{{IL0iG7Pk2cGJ6ji zfFw-8Hc-92K#c&1sYpFep$;^^gE2KhJrJffc*}KTqW8Edd2uPGYNCpq$66IhrED;_ z|KJcfdiQ){E zniq4UZ{zEx>D8weKgWjbVk-=7)ee)Jb6Od6U( z`7d8!hy}#O21$egR0<8%HwFq1=u-5<97LcUNca2LOh-g`Icdsa4i^h@@M*qcA*=lH zU@evkCh-}R{_RONX`*r1!dziYi$=JhI>d+Nu!j=8!VacO^n#;KWjBKrjL!P-6J=3C zS)dra2TdW0UNDNqg3C`>i-kh_A1PCTjd(k3wXq=>_}C78fv)(v5=b;PDsz{}G)>?Z zT!oXCVUtuTtJR6Cp={aMY}v||O`?{Vd|VJSFkM_2j}8>)8`3LXNpx9sN%iUEL5hsj zH$*V-d>j~Q8G*Mvix2_m+36IVPf^sQv zCzPo83G}zzlJT2KNWw45Jnkc^tj$03(+bUQm=_!3eB3Ic>m5;84#!lU*8ha~KjG;Z zoG>@7aK7W|D2i2{n=QTT=~#r+_Mdv34+1w#s~--N#7aG@FxmCpdYq=0C-LO+AGF&& z(-V`k#FLueJ3nAty5e|=FDC7{ltYVId02RB4^V|hL??DF(T5Q>y<(gI*43fR{6uX~ zyNrVGAy$05LvaXzo!9}EZN{NwvF)mZQ(PMZd0h=!J*N`;!j#OoO#wcF`w(KWfZGMP z=&pPfAon2B$H#p1phh@=FP9=rR`r{Ae(djDnfR3ps;YS!%%Wsmt3*U{Bo#tp>jYBc zk~TJt+(NR9)jwxn!85SpBuKtjo|UHTgHWgxPGo+Cwm& zSa?>Ma*nuQ!De6CVoxOdG{20w9zLY_ygXwLtOc9WjDG(?Cfun&23yn$XkZ#Rm5CAL z4^W{iem91Z3WPx;PCuzxdb$H6>^@ngq~Ub~|CS>BK0fDqC}SEONvk7IYbuMamX!fA zt)#$K{YwF_FV0xK+|EsBLm>t)ZV^d`ra@XM(yS*c*P%es-o4rM2f$}a{1#{PZeQfs zYwRTdVYwjk1}*lBl4YYy4{>5P#_(c%_GepOYGI33HQPOItxS~Ny|-os8 zuw?b_Um#LgW3JU-8Zlx}*XalS%+M<#C5`4QX#6ZoY9M{2iM0R-(k zeM+K!D9d_8-(TNf`!~;g<&VQ@-@1B5uwaF}zM_OiN2YeplOxMR<&a`Vj9S_kI}y5c971V1~S38(-gb~=7tSc9Xrb#wB@x_rT2V*C_hkA45WayjzzQ_RXNY0MII~lqic|L8U9}HC)LP3N m>vg?64Hi1fgZSf;K=m(9Tecs3pAugmsQHui_9yG$VE+sF<|JGI literal 0 HcmV?d00001 -- 2.16.6