19ca5b07a48841f92e5574eab40191fab8a97693
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-pm-manager / src / main / java / org / o / ran / oam / nf / oam / adopter / pm / rest / manager / mapper / PerformanceManagementFile2VesMapper.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.pm.rest.manager.mapper;
21
22 import com.fasterxml.jackson.core.JsonParser;
23 import com.fasterxml.jackson.dataformat.csv.CsvMapper;
24 import com.fasterxml.jackson.dataformat.csv.CsvSchema;
25 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
26 import io.reactivex.rxjava3.core.Single;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.zip.ZipEntry;
33 import java.util.zip.ZipInputStream;
34 import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
35 import org.o.ran.oam.nf.oam.adopter.api.Event;
36 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.PerformanceManagementMapperConfigProvider;
37 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.CsvConfiguration;
38 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.VesMappingConfiguration;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.stereotype.Service;
43
44 @Service
45 public class PerformanceManagementFile2VesMapper {
46     private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagementFile2VesMapper.class);
47
48     private static final String CSV_EXTENSION = ".csv";
49     private final PerformanceManagementMapperConfigProvider pmConfigProvider;
50     private static final int THRESHOLD_SIZE  = 1000000000; // 1 GB
51     private static final double THRESHOLD_RATIO = 40;
52     private static final int THRESHOLD_ENTRIES = 10000;
53
54     @Autowired
55     public PerformanceManagementFile2VesMapper(final PerformanceManagementMapperConfigProvider pmConfigProvider) {
56         this.pmConfigProvider = pmConfigProvider;
57     }
58
59     /**
60      * Translate CSV in ZipInputStream format to list of CommonEventFormat302ONAP events.
61      *
62      * @param zipInputStream csv
63      * @param hostIp source Ip Address
64      * @return CommonEventFormat302ONAP events
65      */
66     @SuppressFBWarnings("REC_CATCH_EXCEPTION")
67     public Single<List<CommonEventFormat302ONAP>> map(final ZipInputStream zipInputStream, final String hostIp) {
68         LOG.info("Converting ZIP files to VES Message started");
69         final List<CommonEventFormat302ONAP> listOfNotifications = new ArrayList<>();
70         final CsvSchema schema = CsvSchema.emptySchema().withHeader();
71         final var mapper = new CsvMapper();
72         mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
73         try {
74             ZipEntry entry;
75             final var mappingConfiguration = pmConfigProvider.getVesMappingConfiguration();
76             var totalSizeEntry = 0;
77             var totalEntryArchive = 0;
78             while ((entry = zipInputStream.getNextEntry()) != null) {
79                 final var size = entry.getSize();
80                 totalEntryArchive++;
81                 totalSizeEntry += size;
82                 if (totalSizeEntry > THRESHOLD_SIZE || size == -1) {
83                     throw new IllegalStateException("File to be unzipped too big.");
84                 }
85
86                 final double compressionRatio = (double) totalSizeEntry / entry.getCompressedSize();
87                 if (compressionRatio > THRESHOLD_RATIO) {
88                     return Single.error(new Exception("Wrong file type, threshold to high " + compressionRatio));
89                 }
90
91                 if (totalEntryArchive > THRESHOLD_ENTRIES) {
92                     // too much entries in this archive, can lead to inodes exhaustion of the system
93                     return Single.error(new Exception("Too many files"));
94                 }
95
96                 final String entryName = entry.getName();
97                 if (!entryName.endsWith(CSV_EXTENSION)) {
98                     return Single.error(new Exception("Wrong file type :" + entryName));
99                 }
100
101                 final Iterator<Map<String, String>> iterator =
102                         mapper.readerFor(Map.class).with(schema).readValues(zipInputStream);
103                 final List<List<Event>> mappedEvents = toEvent(mappingConfiguration, hostIp, iterator);
104
105                 mappedEvents.forEach(mapped -> {
106                     final var eventFormat = new CommonEventFormat302ONAP();
107                     eventFormat.setEventList(mapped);
108                     listOfNotifications.add(eventFormat);
109                 });
110             }
111         } catch (final Exception e) {
112             return Single.error(new Exception("Failed to process file", e));
113         } finally {
114             try {
115                 zipInputStream.closeEntry();
116             } catch (final IOException e) {
117                 LOG.warn("Failed to close zip stream", e);
118             }
119         }
120         LOG.info("Converting ZIP files to VES Message finished");
121         return Single.just(listOfNotifications);
122     }
123
124     private static List<List<Event>> toEvent(final VesMappingConfiguration mappingConfiguration, final String hostIp,
125             final Iterator<Map<String, String>> iterator) {
126         final List<List<Event>> globalList = new ArrayList<>();
127         final int batchSize = mappingConfiguration.getBatchSize();
128         var sequence = 0;
129         List<Event> events = new ArrayList<>();
130         final CsvConfiguration csv = mappingConfiguration.getCsv();
131         while (iterator.hasNext()) {
132             final var event = new Event();
133             final Map<String, String> recordMap = iterator.next();
134             event.setCommonEventHeader(CommonEventHeaderHandler.toCommonEventHeader(mappingConfiguration, hostIp, csv,
135                 recordMap,  sequence));
136             event.setMeasurementFields(MeasurementFieldsHandler.toMeasurementFields(mappingConfiguration, recordMap));
137             events.add(event);
138             sequence++;
139             if (sequence % batchSize == 0) {
140                 globalList.add(events);
141                 events = new ArrayList<>();
142             }
143         }
144         if (!events.isEmpty()) {
145             globalList.add(events);
146         }
147         return globalList;
148     }
149 }