X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ves-nf-oam-adopter%2Fves-nf-oam-adopter-pm-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fo%2Fran%2Foam%2Fnf%2Foam%2Fadopter%2Fpm%2Frest%2Fmanager%2Fmapper%2FPerformanceManagementFile2VesMapper.java;h=8c0c5bb8399f4c8840db2519a77056d83e7b0f49;hb=HEAD;hp=6036a93c7e9215f791088801c50e343bd087bc4d;hpb=478bd59ad277738b9788b73230168bdd183bcba7;p=oam%2Fnf-oam-adopter.git diff --git a/ves-nf-oam-adopter/ves-nf-oam-adopter-pm-manager/src/main/java/org/o/ran/oam/nf/oam/adopter/pm/rest/manager/mapper/PerformanceManagementFile2VesMapper.java b/ves-nf-oam-adopter/ves-nf-oam-adopter-pm-manager/src/main/java/org/o/ran/oam/nf/oam/adopter/pm/rest/manager/mapper/PerformanceManagementFile2VesMapper.java index 6036a93..8c0c5bb 100644 --- a/ves-nf-oam-adopter/ves-nf-oam-adopter-pm-manager/src/main/java/org/o/ran/oam/nf/oam/adopter/pm/rest/manager/mapper/PerformanceManagementFile2VesMapper.java +++ b/ves-nf-oam-adopter/ves-nf-oam-adopter-pm-manager/src/main/java/org/o/ran/oam/nf/oam/adopter/pm/rest/manager/mapper/PerformanceManagementFile2VesMapper.java @@ -19,21 +19,27 @@ package org.o.ran.oam.nf.oam.adopter.pm.rest.manager.mapper; -import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvSchema; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.reactivex.rxjava3.core.Single; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP; import org.o.ran.oam.nf.oam.adopter.api.Event; import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.PerformanceManagementMapperConfigProvider; +import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.exceptions.PerformanceManagementException; import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.CsvConfiguration; import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.VesMappingConfiguration; import org.slf4j.Logger; @@ -46,11 +52,21 @@ public class PerformanceManagementFile2VesMapper { private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagementFile2VesMapper.class); private static final String CSV_EXTENSION = ".csv"; + private static final CsvSchema schema = CsvSchema.emptySchema().withHeader(); private final PerformanceManagementMapperConfigProvider pmConfigProvider; + private static final int THRESHOLD_SIZE = 1000000000; // 1 GB + private static final double THRESHOLD_RATIO = 40; + private static final int THRESHOLD_ENTRIES = 10000; + private static final int READ_BUFFER_SIZE = 2048; + private final CsvMapper mapper; + /** + * Default constructor. + */ @Autowired public PerformanceManagementFile2VesMapper(final PerformanceManagementMapperConfigProvider pmConfigProvider) { this.pmConfigProvider = pmConfigProvider; + this.mapper = new CsvMapper(); } /** @@ -64,28 +80,36 @@ public class PerformanceManagementFile2VesMapper { public Single> map(final ZipInputStream zipInputStream, final String hostIp) { LOG.info("Converting ZIP files to VES Message started"); final List listOfNotifications = new ArrayList<>(); - final CsvSchema schema = CsvSchema.emptySchema().withHeader(); - final CsvMapper mapper = new CsvMapper(); - mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); + try { + var totalEntryArchive = 0; + final var totalSizeEntry = new AtomicInteger(); + ZipEntry entry; - final VesMappingConfiguration mappingConfiguration = pmConfigProvider.getVesMappingConfiguration(); + final List> mappedEvents = new ArrayList<>(); while ((entry = zipInputStream.getNextEntry()) != null) { final String entryName = entry.getName(); if (!entryName.endsWith(CSV_EXTENSION)) { - return Single.error(new Exception("Wrong file type :" + entryName)); + throw new PerformanceManagementException("Wrong file type :" + entryName); } + totalEntryArchive++; + if (totalEntryArchive > THRESHOLD_ENTRIES) { + throw new PerformanceManagementException("Too many files: " + totalSizeEntry); + } + + final BufferedReader reader = extract(zipInputStream, totalSizeEntry, entry.getCompressedSize()); final Iterator> iterator = - mapper.readerFor(Map.class).with(schema).readValues(zipInputStream); - final List> mappedEvents = toEvent(mappingConfiguration, hostIp, iterator); - - mappedEvents.forEach(mapped -> { - final CommonEventFormat302ONAP eventFormat = new CommonEventFormat302ONAP(); - eventFormat.setEventList(mapped); - listOfNotifications.add(eventFormat); - }); + mapper.readerFor(Map.class).with(schema).readValues(reader); + final var mappingConfiguration = pmConfigProvider.getVesMappingConfiguration(); + mappedEvents.addAll(toEvent(mappingConfiguration, hostIp, iterator)); } + + mappedEvents.forEach(mapped -> { + final var eventFormat = new CommonEventFormat302ONAP(); + eventFormat.setEventList(mapped); + listOfNotifications.add(eventFormat); + }); } catch (final Exception e) { return Single.error(new Exception("Failed to process file", e)); } finally { @@ -99,19 +123,47 @@ public class PerformanceManagementFile2VesMapper { return Single.just(listOfNotifications); } + private BufferedReader extract(final ZipInputStream zis, final AtomicInteger totalSizeEntry, + final long compressedSize) throws PerformanceManagementException, IOException { + final var out = new ByteArrayOutputStream(); + final var buffer = new byte[READ_BUFFER_SIZE]; + int len; + + while (zis.available() > 0) { + len = zis.read(buffer); + final int currentSize = totalSizeEntry.addAndGet(len); + + if (currentSize > THRESHOLD_SIZE) { + throw new PerformanceManagementException("ZIP file too big."); + } + + final double compressionRatio = (double) currentSize / compressedSize; + if (compressionRatio > THRESHOLD_RATIO) { + throw new PerformanceManagementException("Wrong file type, threshold to high " + compressionRatio); + } + + if (len > 0) { + out.write(buffer, 0, len); + } + } + + return new BufferedReader( + new InputStreamReader(new ByteArrayInputStream(out.toByteArray()), StandardCharsets.UTF_8)); + } + private static List> toEvent(final VesMappingConfiguration mappingConfiguration, final String hostIp, final Iterator> iterator) { final List> globalList = new ArrayList<>(); final int batchSize = mappingConfiguration.getBatchSize(); - int sequence = 0; + var sequence = 0; List events = new ArrayList<>(); final CsvConfiguration csv = mappingConfiguration.getCsv(); while (iterator.hasNext()) { - final Event event = new Event(); - final Map record = iterator.next(); - event.setCommonEventHeader( - CommonEventHeaderHandler.toCommonEventHeader(mappingConfiguration, hostIp, csv, record, sequence)); - event.setMeasurementFields(MeasurementFieldsHandler.toMeasurementFields(mappingConfiguration, record)); + final var event = new Event(); + final Map recordMap = iterator.next(); + event.setCommonEventHeader(CommonEventHeaderHandler.toCommonEventHeader(mappingConfiguration, hostIp, csv, + recordMap, sequence)); + event.setMeasurementFields(MeasurementFieldsHandler.toMeasurementFields(mappingConfiguration, recordMap)); events.add(event); sequence++; if (sequence % batchSize == 0) {