package org.o.ran.oam.nf.oam.adopter.pm.rest.manager.mapper;
-import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.reactivex.rxjava3.core.Single;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
import org.o.ran.oam.nf.oam.adopter.api.Event;
import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.PerformanceManagementMapperConfigProvider;
+import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.exceptions.PerformanceManagementException;
import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.CsvConfiguration;
import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.VesMappingConfiguration;
import org.slf4j.Logger;
private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagementFile2VesMapper.class);
private static final String CSV_EXTENSION = ".csv";
+ private static final CsvSchema schema = CsvSchema.emptySchema().withHeader();
private final PerformanceManagementMapperConfigProvider pmConfigProvider;
private static final int THRESHOLD_SIZE = 1000000000; // 1 GB
private static final double THRESHOLD_RATIO = 40;
private static final int THRESHOLD_ENTRIES = 10000;
+ private static final int READ_BUFFER_SIZE = 2048;
+ private final CsvMapper mapper;
+ /**
+ * Default constructor.
+ */
@Autowired
public PerformanceManagementFile2VesMapper(final PerformanceManagementMapperConfigProvider pmConfigProvider) {
this.pmConfigProvider = pmConfigProvider;
+ this.mapper = new CsvMapper();
}
/**
public Single<List<CommonEventFormat302ONAP>> map(final ZipInputStream zipInputStream, final String hostIp) {
LOG.info("Converting ZIP files to VES Message started");
final List<CommonEventFormat302ONAP> listOfNotifications = new ArrayList<>();
- final CsvSchema schema = CsvSchema.emptySchema().withHeader();
- final var mapper = new CsvMapper();
- mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+
try {
- ZipEntry entry;
- final var mappingConfiguration = pmConfigProvider.getVesMappingConfiguration();
- var totalSizeEntry = 0;
var totalEntryArchive = 0;
- while ((entry = zipInputStream.getNextEntry()) != null) {
- final var size = entry.getSize();
- totalEntryArchive++;
- totalSizeEntry += size;
- if (totalSizeEntry > THRESHOLD_SIZE || size == -1) {
- throw new IllegalStateException("File to be unzipped too big.");
- }
+ final var totalSizeEntry = new AtomicInteger();
- final double compressionRatio = (double) totalSizeEntry / entry.getCompressedSize();
- if (compressionRatio > THRESHOLD_RATIO) {
- return Single.error(new Exception("Wrong file type, threshold to high " + compressionRatio));
+ ZipEntry entry;
+ final List<List<Event>> mappedEvents = new ArrayList<>();
+ while ((entry = zipInputStream.getNextEntry()) != null) {
+ final String entryName = entry.getName();
+ if (!entryName.endsWith(CSV_EXTENSION)) {
+ throw new PerformanceManagementException("Wrong file type :" + entryName);
}
+ totalEntryArchive++;
if (totalEntryArchive > THRESHOLD_ENTRIES) {
- // too much entries in this archive, can lead to inodes exhaustion of the system
- return Single.error(new Exception("Too many files"));
- }
-
- final String entryName = entry.getName();
- if (!entryName.endsWith(CSV_EXTENSION)) {
- return Single.error(new Exception("Wrong file type :" + entryName));
+ throw new PerformanceManagementException("Too many files: " + totalSizeEntry);
}
+ final BufferedReader reader = extract(zipInputStream, totalSizeEntry, entry.getCompressedSize());
final Iterator<Map<String, String>> iterator =
- mapper.readerFor(Map.class).with(schema).readValues(zipInputStream);
- final List<List<Event>> mappedEvents = toEvent(mappingConfiguration, hostIp, iterator);
-
- mappedEvents.forEach(mapped -> {
- final var eventFormat = new CommonEventFormat302ONAP();
- eventFormat.setEventList(mapped);
- listOfNotifications.add(eventFormat);
- });
+ mapper.readerFor(Map.class).with(schema).readValues(reader);
+ final var mappingConfiguration = pmConfigProvider.getVesMappingConfiguration();
+ mappedEvents.addAll(toEvent(mappingConfiguration, hostIp, iterator));
}
+
+ mappedEvents.forEach(mapped -> {
+ final var eventFormat = new CommonEventFormat302ONAP();
+ eventFormat.setEventList(mapped);
+ listOfNotifications.add(eventFormat);
+ });
} catch (final Exception e) {
return Single.error(new Exception("Failed to process file", e));
} finally {
return Single.just(listOfNotifications);
}
+ private BufferedReader extract(final ZipInputStream zis, final AtomicInteger totalSizeEntry,
+ final long compressedSize) throws PerformanceManagementException, IOException {
+ final var out = new ByteArrayOutputStream();
+ final var buffer = new byte[READ_BUFFER_SIZE];
+ int len;
+
+ while (zis.available() > 0) {
+ len = zis.read(buffer);
+ final int currentSize = totalSizeEntry.addAndGet(len);
+
+ if (currentSize > THRESHOLD_SIZE) {
+ throw new PerformanceManagementException("ZIP file too big.");
+ }
+
+ final double compressionRatio = (double) currentSize / compressedSize;
+ if (compressionRatio > THRESHOLD_RATIO) {
+ throw new PerformanceManagementException("Wrong file type, threshold to high " + compressionRatio);
+ }
+
+ if (len > 0) {
+ out.write(buffer, 0, len);
+ }
+ }
+
+ return new BufferedReader(
+ new InputStreamReader(new ByteArrayInputStream(out.toByteArray()), StandardCharsets.UTF_8));
+ }
+
private static List<List<Event>> toEvent(final VesMappingConfiguration mappingConfiguration, final String hostIp,
final Iterator<Map<String, String>> iterator) {
final List<List<Event>> globalList = new ArrayList<>();