Unzipping of files with ending .gz 61/9161/1
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 3 Oct 2022 15:47:35 +0000 (17:47 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 3 Oct 2022 15:47:35 +0000 (17:47 +0200)
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I7de2f36aa1dde24a428485d50d03c78f832a8e97

src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/tasks/DataStore.java
src/main/java/org/oran/dmaapadapter/tasks/FileStore.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/S3ObjectStore.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/resources/pm_report.json.gz [new file with mode: 0644]

index 4f0bc8c..7b868ba 100644 (file)
@@ -52,10 +52,13 @@ public class InfoType {
 
     private String dataType;
 
-    @Getter
     @Builder.Default
     private boolean isJson = false;
 
+    public boolean isJson() {
+        return this.isJson || getDataType() == DataType.PM_DATA;
+    }
+
     public boolean isKafkaTopicDefined() {
         return StringUtils.hasLength(kafkaInputTopic);
     }
index 75291f0..2bb2753 100644 (file)
@@ -30,9 +30,9 @@ public interface DataStore {
 
     public Flux<String> listFiles(Bucket bucket, String prefix);
 
-    public Mono<String> readFile(Bucket bucket, String fileName);
+    public Mono<byte[]> readFile(Bucket bucket, String fileName);
 
-    public Mono<String> readFile(String bucket, String fileName);
+    public Mono<byte[]> readFile(String bucket, String fileName);
 
     public Mono<Boolean> createLock(String name);
 
index 41bacd9..f11cf11 100644 (file)
@@ -73,14 +73,14 @@ public class FileStore implements DataStore {
         return fullName.substring(applicationConfig.getPmFilesPath().length());
     }
 
-    public Mono<String> readFile(String bucket, String fileName) {
+    public Mono<byte[]> readFile(String bucket, String fileName) {
         return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
     }
 
     @Override
-    public Mono<String> readFile(Bucket bucket, String fileName) {
+    public Mono<byte[]> readFile(Bucket bucket, String fileName) {
         try {
-            String contents = Files.readString(path(fileName));
+            byte[] contents = Files.readAllBytes(path(fileName));
             return Mono.just(contents);
         } catch (Exception e) {
             return Mono.error(e);
index e6facbc..bd62d4f 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
+import java.util.zip.GZIPInputStream;
 
 import lombok.Getter;
 
@@ -225,6 +228,7 @@ public abstract class JobDataDistributor {
             if (ev.getObjectStoreBucket() != null) {
                 if (this.applConfig.isS3Enabled()) {
                     return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) //
+                            .map(str -> unzip(str, ev.getFilename())) //
                             .map(str -> new DataFromTopic(data.key, str));
                 } else {
                     logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
@@ -245,6 +249,21 @@ public abstract class JobDataDistributor {
         }
     }
 
+    private byte[] unzip(byte[] bytes, String fileName) {
+        if (!fileName.endsWith(".gz")) {
+            return bytes;
+        }
+
+        try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
+
+            return gzipInput.readAllBytes();
+        } catch (IOException e) {
+            logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage());
+            return new byte[0];
+        }
+
+    }
+
     private String quoteNonJson(String str, Job job) {
         return job.getType().isJson() ? str : quote(str);
     }
index 0a2cf25..74ba49d 100644 (file)
@@ -21,7 +21,6 @@
 package org.oran.dmaapadapter.tasks;
 
 import java.net.URI;
-import java.nio.charset.Charset;
 import java.nio.file.Path;
 import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
@@ -130,12 +129,12 @@ public class S3ObjectStore implements DataStore {
     }
 
     @Override
-    public Mono<String> readFile(Bucket bucket, String fileName) {
+    public Mono<byte[]> readFile(Bucket bucket, String fileName) {
         return getDataFromS3Object(bucket(bucket), fileName);
     }
 
     @Override
-    public Mono<String> readFile(String bucket, String fileName) {
+    public Mono<byte[]> readFile(String bucket, String fileName) {
         return getDataFromS3Object(bucket, fileName);
     }
 
@@ -242,7 +241,7 @@ public class S3ObjectStore implements DataStore {
         ;
     }
 
-    private Mono<String> getDataFromS3Object(String bucket, String key) {
+    private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
 
         GetObjectRequest request = GetObjectRequest.builder() //
                 .bucket(bucket) //
@@ -253,7 +252,7 @@ public class S3ObjectStore implements DataStore {
                 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
 
         return Mono.fromFuture(future) //
-                .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
+                .map(b -> b.asByteArray()) //
                 .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
                 .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
                 .onErrorResume(t -> Mono.empty());
index e26915b..373020c 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.oran.dmaapadapter.tasks;
 
+import java.nio.charset.StandardCharsets;
+
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
@@ -42,6 +44,12 @@ public interface TopicListener {
             this.key = key;
             this.value = value;
         }
+
+        public DataFromTopic(String key, byte[] value) {
+            this.key = key;
+            this.value = new String(value, StandardCharsets.UTF_8);
+        }
+
     }
 
     public Flux<DataFromTopic> getFlux();
index 668a327..37c1b7d 100644 (file)
@@ -493,14 +493,14 @@ class IntegrationWithKafka {
         Instant startTime = Instant.now();
 
         KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() //
-                .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) //
+                .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) //
                 .build();
 
         S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
 
         fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
-        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json")
-                .block();
+        fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"),
+                "pm_report.json.gz").block();
 
         String eventAsString = gson.toJson(event);
 
diff --git a/src/test/resources/pm_report.json.gz b/src/test/resources/pm_report.json.gz
new file mode 100644 (file)
index 0000000..2d191d1
Binary files /dev/null and b/src/test/resources/pm_report.json.gz differ