Release oam-ves-adopter Contatiner
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-pm-manager / src / main / java / org / o / ran / oam / nf / oam / adopter / pm / rest / manager / mapper / PerformanceManagementFile2VesMapper.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.pm.rest.manager.mapper;
21
22 import com.fasterxml.jackson.dataformat.csv.CsvMapper;
23 import com.fasterxml.jackson.dataformat.csv.CsvSchema;
24 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
25 import io.reactivex.rxjava3.core.Single;
26 import java.io.BufferedReader;
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.InputStreamReader;
31 import java.nio.charset.StandardCharsets;
32 import java.util.ArrayList;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.zip.ZipEntry;
38 import java.util.zip.ZipInputStream;
39 import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
40 import org.o.ran.oam.nf.oam.adopter.api.Event;
41 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.PerformanceManagementMapperConfigProvider;
42 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.exceptions.PerformanceManagementException;
43 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.CsvConfiguration;
44 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.VesMappingConfiguration;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.stereotype.Service;
49
50 @Service
51 public class PerformanceManagementFile2VesMapper {
52     private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagementFile2VesMapper.class);
53
54     private static final String CSV_EXTENSION = ".csv";
55     private static final CsvSchema schema = CsvSchema.emptySchema().withHeader();
56     private final PerformanceManagementMapperConfigProvider pmConfigProvider;
57     private static final int THRESHOLD_SIZE  = 1000000000; // 1 GB
58     private static final double THRESHOLD_RATIO = 40;
59     private static final int THRESHOLD_ENTRIES = 10000;
60     private static final int READ_BUFFER_SIZE = 2048;
61     private final CsvMapper mapper;
62
63     /**
64      * Default constructor.
65      */
66     @Autowired
67     public PerformanceManagementFile2VesMapper(final PerformanceManagementMapperConfigProvider pmConfigProvider) {
68         this.pmConfigProvider = pmConfigProvider;
69         this.mapper = new CsvMapper();
70     }
71
72     /**
73      * Translate CSV in ZipInputStream format to list of CommonEventFormat302ONAP events.
74      *
75      * @param zipInputStream csv
76      * @param hostIp source Ip Address
77      * @return CommonEventFormat302ONAP events
78      */
79     @SuppressFBWarnings("REC_CATCH_EXCEPTION")
80     public Single<List<CommonEventFormat302ONAP>> map(final ZipInputStream zipInputStream, final String hostIp) {
81         LOG.info("Converting ZIP files to VES Message started");
82         final List<CommonEventFormat302ONAP> listOfNotifications = new ArrayList<>();
83
84         try {
85             var totalEntryArchive = 0;
86             final var totalSizeEntry = new AtomicInteger();
87
88             ZipEntry entry;
89             final List<List<Event>> mappedEvents = new ArrayList<>();
90             while ((entry = zipInputStream.getNextEntry()) != null) {
91                 final String entryName = entry.getName();
92                 if (!entryName.endsWith(CSV_EXTENSION)) {
93                     throw new PerformanceManagementException("Wrong file type :" + entryName);
94                 }
95
96                 totalEntryArchive++;
97                 if (totalEntryArchive > THRESHOLD_ENTRIES) {
98                     throw new PerformanceManagementException("Too many files: " + totalSizeEntry);
99                 }
100
101                 final BufferedReader reader = extract(zipInputStream, totalSizeEntry, entry.getCompressedSize());
102                 final Iterator<Map<String, String>> iterator =
103                         mapper.readerFor(Map.class).with(schema).readValues(reader);
104                 final var mappingConfiguration = pmConfigProvider.getVesMappingConfiguration();
105                 mappedEvents.addAll(toEvent(mappingConfiguration, hostIp, iterator));
106             }
107
108             mappedEvents.forEach(mapped -> {
109                 final var eventFormat = new CommonEventFormat302ONAP();
110                 eventFormat.setEventList(mapped);
111                 listOfNotifications.add(eventFormat);
112             });
113         } catch (final Exception e) {
114             return Single.error(new Exception("Failed to process file", e));
115         } finally {
116             try {
117                 zipInputStream.closeEntry();
118             } catch (final IOException e) {
119                 LOG.warn("Failed to close zip stream", e);
120             }
121         }
122         LOG.info("Converting ZIP files to VES Message finished");
123         return Single.just(listOfNotifications);
124     }
125
126     private BufferedReader extract(final ZipInputStream zis, final AtomicInteger totalSizeEntry,
127             final long compressedSize) throws PerformanceManagementException, IOException {
128         final var out = new ByteArrayOutputStream();
129         final var buffer = new byte[READ_BUFFER_SIZE];
130         int len;
131
132         while (zis.available() > 0) {
133             len = zis.read(buffer);
134             final int currentSize = totalSizeEntry.addAndGet(len);
135
136             if (currentSize > THRESHOLD_SIZE) {
137                 throw new PerformanceManagementException("ZIP file too big.");
138             }
139
140             final double compressionRatio = (double) currentSize / compressedSize;
141             if (compressionRatio > THRESHOLD_RATIO) {
142                 throw new PerformanceManagementException("Wrong file type, threshold to high " + compressionRatio);
143             }
144
145             if (len > 0) {
146                 out.write(buffer, 0, len);
147             }
148         }
149
150         return new BufferedReader(
151                 new InputStreamReader(new ByteArrayInputStream(out.toByteArray()), StandardCharsets.UTF_8));
152     }
153
154     private static List<List<Event>> toEvent(final VesMappingConfiguration mappingConfiguration, final String hostIp,
155             final Iterator<Map<String, String>> iterator) {
156         final List<List<Event>> globalList = new ArrayList<>();
157         final int batchSize = mappingConfiguration.getBatchSize();
158         var sequence = 0;
159         List<Event> events = new ArrayList<>();
160         final CsvConfiguration csv = mappingConfiguration.getCsv();
161         while (iterator.hasNext()) {
162             final var event = new Event();
163             final Map<String, String> recordMap = iterator.next();
164             event.setCommonEventHeader(CommonEventHeaderHandler.toCommonEventHeader(mappingConfiguration, hostIp, csv,
165                 recordMap,  sequence));
166             event.setMeasurementFields(MeasurementFieldsHandler.toMeasurementFields(mappingConfiguration, recordMap));
167             events.add(event);
168             sequence++;
169             if (sequence % batchSize == 0) {
170                 globalList.add(events);
171                 events = new ArrayList<>();
172             }
173         }
174         if (!events.isEmpty()) {
175             globalList.add(events);
176         }
177         return globalList;
178     }
179 }