private String dataType;
- @Getter
@Builder.Default
private boolean isJson = false;
+ public boolean isJson() {
+ return this.isJson || getDataType() == DataType.PM_DATA;
+ }
+
public boolean isKafkaTopicDefined() {
return StringUtils.hasLength(kafkaInputTopic);
}
public Flux<String> listFiles(Bucket bucket, String prefix);
- public Mono<String> readFile(Bucket bucket, String fileName);
+ public Mono<byte[]> readFile(Bucket bucket, String fileName);
- public Mono<String> readFile(String bucket, String fileName);
+ public Mono<byte[]> readFile(String bucket, String fileName);
public Mono<Boolean> createLock(String name);
return fullName.substring(applicationConfig.getPmFilesPath().length());
}
- public Mono<String> readFile(String bucket, String fileName) {
+ public Mono<byte[]> readFile(String bucket, String fileName) {
return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
}
@Override
- public Mono<String> readFile(Bucket bucket, String fileName) {
+ public Mono<byte[]> readFile(Bucket bucket, String fileName) {
try {
- String contents = Files.readString(path(fileName));
+ byte[] contents = Files.readAllBytes(path(fileName));
return Mono.just(contents);
} catch (Exception e) {
return Mono.error(e);
package org.oran.dmaapadapter.tasks;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
+import java.util.zip.GZIPInputStream;
import lombok.Getter;
if (ev.getObjectStoreBucket() != null) {
if (this.applConfig.isS3Enabled()) {
return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) //
+ .map(str -> unzip(str, ev.getFilename())) //
.map(str -> new DataFromTopic(data.key, str));
} else {
logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
}
}
+ private byte[] unzip(byte[] bytes, String fileName) {
+ if (!fileName.endsWith(".gz")) {
+ return bytes;
+ }
+
+ try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
+
+ return gzipInput.readAllBytes();
+ } catch (IOException e) {
+ logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage());
+ return new byte[0];
+ }
+
+ }
+
private String quoteNonJson(String str, Job job) {
return job.getType().isJson() ? str : quote(str);
}
package org.oran.dmaapadapter.tasks;
import java.net.URI;
-import java.nio.charset.Charset;
import java.nio.file.Path;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
}
@Override
- public Mono<String> readFile(Bucket bucket, String fileName) {
+ public Mono<byte[]> readFile(Bucket bucket, String fileName) {
return getDataFromS3Object(bucket(bucket), fileName);
}
@Override
- public Mono<String> readFile(String bucket, String fileName) {
+ public Mono<byte[]> readFile(String bucket, String fileName) {
return getDataFromS3Object(bucket, fileName);
}
;
}
- private Mono<String> getDataFromS3Object(String bucket, String key) {
+ private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
GetObjectRequest request = GetObjectRequest.builder() //
.bucket(bucket) //
s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
return Mono.fromFuture(future) //
- .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
+ .map(b -> b.asByteArray()) //
.doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
.doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
.onErrorResume(t -> Mono.empty());
package org.oran.dmaapadapter.tasks;
+import java.nio.charset.StandardCharsets;
+
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
this.key = key;
this.value = value;
}
+
+ public DataFromTopic(String key, byte[] value) {
+ this.key = key;
+ this.value = new String(value, StandardCharsets.UTF_8);
+ }
+
}
public Flux<DataFromTopic> getFlux();
Instant startTime = Instant.now();
KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() //
- .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) //
+ .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) //
.build();
S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
- fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json")
- .block();
+ fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"),
+ "pm_report.json.gz").block();
String eventAsString = gson.toJson(event);