From 4078a1da9063d7e1307f66586fca361416b9d613 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 3 Oct 2022 17:47:35 +0200 Subject: [PATCH] Unzipping of files with ending .gz Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I7de2f36aa1dde24a428485d50d03c78f832a8e97 --- .../org/oran/dmaapadapter/repository/InfoType.java | 5 ++++- .../java/org/oran/dmaapadapter/tasks/DataStore.java | 4 ++-- .../java/org/oran/dmaapadapter/tasks/FileStore.java | 6 +++--- .../oran/dmaapadapter/tasks/JobDataDistributor.java | 19 +++++++++++++++++++ .../org/oran/dmaapadapter/tasks/S3ObjectStore.java | 9 ++++----- .../org/oran/dmaapadapter/tasks/TopicListener.java | 8 ++++++++ .../org/oran/dmaapadapter/IntegrationWithKafka.java | 6 +++--- src/test/resources/pm_report.json.gz | Bin 0 -> 937 bytes 8 files changed, 43 insertions(+), 14 deletions(-) create mode 100644 src/test/resources/pm_report.json.gz diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index 4f0bc8c..7b868ba 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -52,10 +52,13 @@ public class InfoType { private String dataType; - @Getter @Builder.Default private boolean isJson = false; + public boolean isJson() { + return this.isJson || getDataType() == DataType.PM_DATA; + } + public boolean isKafkaTopicDefined() { return StringUtils.hasLength(kafkaInputTopic); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java index 75291f0..2bb2753 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DataStore.java @@ -30,9 +30,9 @@ public interface DataStore { public Flux listFiles(Bucket bucket, String prefix); - public Mono readFile(Bucket bucket, String fileName); + public Mono readFile(Bucket bucket, String fileName); - public Mono readFile(String bucket, String fileName); + public Mono readFile(String bucket, String fileName); public Mono createLock(String name); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java index 41bacd9..f11cf11 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/FileStore.java @@ -73,14 +73,14 @@ public class FileStore implements DataStore { return fullName.substring(applicationConfig.getPmFilesPath().length()); } - public Mono readFile(String bucket, String fileName) { + public Mono readFile(String bucket, String fileName) { return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT)); } @Override - public Mono readFile(Bucket bucket, String fileName) { + public Mono readFile(Bucket bucket, String fileName) { try { - String contents = Files.readString(path(fileName)); + byte[] contents = Files.readAllBytes(path(fileName)); return Mono.just(contents); } catch (Exception e) { return Mono.error(e); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index e6facbc..bd62d4f 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -20,12 +20,15 @@ package org.oran.dmaapadapter.tasks; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; +import java.util.zip.GZIPInputStream; import lombok.Getter; @@ -225,6 +228,7 @@ public abstract class JobDataDistributor { if (ev.getObjectStoreBucket() != null) { if (this.applConfig.isS3Enabled()) { return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) // + .map(str -> unzip(str, ev.getFilename())) // .map(str -> new DataFromTopic(data.key, str)); } else { logger.error("S3 is not configured in application.yaml, ignoring: {}", data); @@ -245,6 +249,21 @@ public abstract class JobDataDistributor { } } + private byte[] unzip(byte[] bytes, String fileName) { + if (!fileName.endsWith(".gz")) { + return bytes; + } + + try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) { + + return gzipInput.readAllBytes(); + } catch (IOException e) { + logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage()); + return new byte[0]; + } + + } + private String quoteNonJson(String str, Job job) { return job.getType().isJson() ? str : quote(str); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java index 0a2cf25..74ba49d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java @@ -21,7 +21,6 @@ package org.oran.dmaapadapter.tasks; import java.net.URI; -import java.nio.charset.Charset; import java.nio.file.Path; import java.time.Instant; import java.util.concurrent.CompletableFuture; @@ -130,12 +129,12 @@ public class S3ObjectStore implements DataStore { } @Override - public Mono readFile(Bucket bucket, String fileName) { + public Mono readFile(Bucket bucket, String fileName) { return getDataFromS3Object(bucket(bucket), fileName); } @Override - public Mono readFile(String bucket, String fileName) { + public Mono readFile(String bucket, String fileName) { return getDataFromS3Object(bucket, fileName); } @@ -242,7 +241,7 @@ public class S3ObjectStore implements DataStore { ; } - private Mono getDataFromS3Object(String bucket, String key) { + private Mono getDataFromS3Object(String bucket, String key) { GetObjectRequest request = GetObjectRequest.builder() // .bucket(bucket) // @@ -253,7 +252,7 @@ public class S3ObjectStore implements DataStore { s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); return Mono.fromFuture(future) // - .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) // + .map(b -> b.asByteArray()) // .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // .onErrorResume(t -> Mono.empty()); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index e26915b..373020c 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -20,6 +20,8 @@ package org.oran.dmaapadapter.tasks; +import java.nio.charset.StandardCharsets; + import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -42,6 +44,12 @@ public interface TopicListener { this.key = key; this.value = value; } + + public DataFromTopic(String key, byte[] value) { + this.key = key; + this.value = new String(value, StandardCharsets.UTF_8); + } + } public Flux getFlux(); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 668a327..37c1b7d 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -493,14 +493,14 @@ class IntegrationWithKafka { Instant startTime = Instant.now(); KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() // - .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) // + .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) // .build(); S3ObjectStore fileStore = new S3ObjectStore(applicationConfig); fileStore.createS3Bucket(DataStore.Bucket.FILES).block(); - fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json") - .block(); + fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"), + "pm_report.json.gz").block(); String eventAsString = gson.toJson(event); diff --git a/src/test/resources/pm_report.json.gz b/src/test/resources/pm_report.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..2d191d15fb29f08f39fec20c3e67ab065393d641 GIT binary patch literal 937 zcmV;a16KSWiwFp4oS|X>18{9$a%FIDa&#_gb8l_{?O021+c*%u=T{iIRw_ivdKK`Y z>p0s0wNvELLyMxI<&mqdM5-i}HXG!>FC|BtI=)ot5^tWno=?=mfrwg0jmiM`HH4ECUQ~BfJOE?%QU?vF72JJ3V8q#Miw%S z$nqUy>^PATgf2F0>^i{`MacK9hAxM1Igk*fvbke(fm5h^{k$eiW5(i;b4osE=^QXW z2c9RQX_MwGgi86$IKDCvvYLJwWR!`RtY<{T;-&&OFd2&}lfpu|n2f@tBtR9=|lTrihnidK+^A1lapuQ12ybKt-Iw z2TI^9iZ~SI)?S$AE7R2OF?SXCXL?U!NLGBS_Dx~UAKD5nmBl#50#9j@Kqxk-nxz7> zW|-s&E|U5VWOj>pCTMKFvXnDeZAx>+fmP)>Ura}rAkEYCI+kvLLWGSjAT zN+MbmP0Bwur1H7lUZ^XTylO)fAU?07G_3`8r*~Puyvcy~881Oyf&ao{AugwHE+GC} zW`=8RJ}awv82(GBG|@{ZB4*Ltf?yUM3Q344!|ye}CY$*fGAEx(=; ze%~z5NqCZzk}8w)6Zr;S3CWKsB*~=?e#ijc)g=Gh3B2pbr#CA*=}GC+ z87+GLtA1~{^j+QebW3-2&(j0>>FBntTL+JENDCbwd=wkfLHh>7r`AAKX_?M|mgzpA zW%eB8+goPWfA{~F76UhNZ)<3;wCsaVPeZz;JyxnQI<8X3FwgIqyp=qLJ)%lPilro1p3jwqQbV}o_8iK8_mzdE5>-6Er;!|H;y&sSSAz2U?&$6x L-;tlE&>sK*VDiiR literal 0 HcmV?d00001 -- 2.16.6