From: PatrikBuhr Date: Mon, 3 Oct 2022 15:47:35 +0000 (+0200) Subject: Unzipping of files with ending .gz X-Git-Tag: 1.2.0~13 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F61%2F9161%2F1;p=nonrtric%2Fplt%2Fdmaapadapter.git Unzipping of files with ending .gz Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I7de2f36aa1dde24a428485d50d03c78f832a8e97 --- diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index 4f0bc8c..7b868ba 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -52,10 +52,13 @@ public class InfoType { private String dataType; - @Getter @Builder.Default private boolean isJson = false; + public boolean isJson() { + return this.isJson || getDataType() == DataType.PM_DATA; + } + public boolean isKafkaTopicDefined() { return StringUtils.hasLength(kafkaInputTopic); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java index 75291f0..2bb2753 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java @@ -30,9 +30,9 @@ public interface DataStore { public Flux listFiles(Bucket bucket, String prefix); - public Mono readFile(Bucket bucket, String fileName); + public Mono readFile(Bucket bucket, String fileName); - public Mono readFile(String bucket, String fileName); + public Mono readFile(String bucket, String fileName); public Mono createLock(String name); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java index 41bacd9..f11cf11 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java @@ -73,14 +73,14 @@ public class FileStore implements DataStore { return fullName.substring(applicationConfig.getPmFilesPath().length()); } - public Mono readFile(String bucket, String fileName) { + public Mono readFile(String bucket, String fileName) { return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT)); } @Override - public Mono readFile(Bucket bucket, String fileName) { + public Mono readFile(Bucket bucket, String fileName) { try { - String contents = Files.readString(path(fileName)); + byte[] contents = Files.readAllBytes(path(fileName)); return Mono.just(contents); } catch (Exception e) { return Mono.error(e); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index e6facbc..bd62d4f 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -20,12 +20,15 @@ package org.oran.dmaapadapter.tasks; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; +import java.util.zip.GZIPInputStream; import lombok.Getter; @@ -225,6 +228,7 @@ public abstract class JobDataDistributor { if (ev.getObjectStoreBucket() != null) { if (this.applConfig.isS3Enabled()) { return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) // + .map(str -> unzip(str, ev.getFilename())) // .map(str -> new DataFromTopic(data.key, str)); } else { logger.error("S3 is not configured in application.yaml, ignoring: {}", data); @@ -245,6 +249,21 @@ public abstract class JobDataDistributor { } } + private byte[] unzip(byte[] bytes, String fileName) { + if (!fileName.endsWith(".gz")) { + return bytes; + } + + try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) { + + return gzipInput.readAllBytes(); + } catch (IOException e) { + logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage()); + return new byte[0]; + } + + } + private String quoteNonJson(String str, Job job) { return job.getType().isJson() ? str : quote(str); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java index 0a2cf25..74ba49d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java @@ -21,7 +21,6 @@ package org.oran.dmaapadapter.tasks; import java.net.URI; -import java.nio.charset.Charset; import java.nio.file.Path; import java.time.Instant; import java.util.concurrent.CompletableFuture; @@ -130,12 +129,12 @@ public class S3ObjectStore implements DataStore { } @Override - public Mono readFile(Bucket bucket, String fileName) { + public Mono readFile(Bucket bucket, String fileName) { return getDataFromS3Object(bucket(bucket), fileName); } @Override - public Mono readFile(String bucket, String fileName) { + public Mono readFile(String bucket, String fileName) { return getDataFromS3Object(bucket, fileName); } @@ -242,7 +241,7 @@ public class S3ObjectStore implements DataStore { ; } - private Mono getDataFromS3Object(String bucket, String key) { + private Mono getDataFromS3Object(String bucket, String key) { GetObjectRequest request = GetObjectRequest.builder() // .bucket(bucket) // @@ -253,7 +252,7 @@ public class S3ObjectStore implements DataStore { s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); return Mono.fromFuture(future) // - .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) // + .map(b -> b.asByteArray()) // .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // .onErrorResume(t -> Mono.empty()); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index e26915b..373020c 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,6 +20,8 @@ package org.oran.dmaapadapter.tasks; +import java.nio.charset.StandardCharsets; + import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -42,6 +44,12 @@ public interface TopicListener { this.key = key; this.value = value; } + + public DataFromTopic(String key, byte[] value) { + this.key = key; + this.value = new String(value, StandardCharsets.UTF_8); + } + } public Flux getFlux(); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 668a327..37c1b7d 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -493,14 +493,14 @@ class IntegrationWithKafka { Instant startTime = Instant.now(); KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() // - .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) // + .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) // .build(); S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); - fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json") - .block(); + fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"), + "pm_report.json.gz").block(); String eventAsString = gson.toJson(event); diff --git a/src/test/resources/pm_report.json.gz b/src/test/resources/pm_report.json.gz new file mode 100644 index 0000000..2d191d1 Binary files /dev/null and b/src/test/resources/pm_report.json.gz differ