CI: Add SonarCloud scan GHA workflow
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / JobDataDistributor.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer.tasks;
22
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.time.OffsetDateTime;
26 import java.time.format.DateTimeFormatter;
27 import java.time.format.DateTimeFormatterBuilder;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.zip.GZIPOutputStream;
31
32 import lombok.Getter;
33
34 import org.apache.commons.lang3.StringUtils;
35 import org.apache.kafka.clients.producer.ProducerConfig;
36 import org.apache.kafka.clients.producer.ProducerRecord;
37 import org.apache.kafka.common.serialization.ByteArraySerializer;
38 import org.oran.pmproducer.configuration.ApplicationConfig;
39 import org.oran.pmproducer.datastore.DataStore;
40 import org.oran.pmproducer.filter.FilteredData;
41 import org.oran.pmproducer.filter.PmReportFilter;
42 import org.oran.pmproducer.repository.Job.Parameters.KafkaDeliveryInfo;
43 import org.oran.pmproducer.repository.Jobs.JobGroup;
44 import org.oran.pmproducer.tasks.TopicListener.DataFromTopic;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import reactor.core.Disposable;
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.kafka.sender.KafkaSender;
52 import reactor.kafka.sender.SenderOptions;
53 import reactor.kafka.sender.SenderRecord;
54
55 /**
56  * The class streams data from a multi cast sink and sends the data to the Job
57  * owner via REST calls.
58  */
59 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
60 public class JobDataDistributor {
61     private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
62
63     @Getter
64     private final JobGroup jobGroup;
65     private Disposable subscription;
66     private final ErrorStats errorStats = new ErrorStats();
67
68     private final DataStore dataStore;
69     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
70     private final ApplicationConfig applConfig;
71
72     private KafkaSender<byte[], byte[]> sender;
73
74     private class ErrorStats {
75         @Getter
76         private int consumerFaultCounter = 0;
77
78         public void handleOkFromConsumer() {
79             this.consumerFaultCounter = 0;
80         }
81
82         @SuppressWarnings("java:S1172")
83         public void handleException(Throwable t) {
84             ++this.consumerFaultCounter;
85         }
86     }
87
88     public JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) {
89         this.applConfig = applConfig;
90         this.jobGroup = jobGroup;
91         this.dataStore = DataStore.create(applConfig);
92         this.dataStore.create(DataStore.Bucket.FILES).subscribe();
93         this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
94
95         SenderOptions<byte[], byte[]> senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo());
96         this.sender = KafkaSender.create(senderOptions);
97
98     }
99
100     public void start(Flux<TopicListener.DataFromTopic> input) {
101         logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
102         PmReportFilter filter = jobGroup.getFilter();
103         if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
104             this.subscription = filter(input, this.jobGroup) //
105                     .flatMap(this::sendToClient) //
106                     .onErrorResume(this::handleError) //
107                     .subscribe(this::handleSentOk, //
108                             this::handleExceptionInStream, //
109                             () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));
110         }
111
112         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
113             this.dataStore.createLock(collectHistoricalDataLockName()) //
114                     .doOnNext(isLockGranted -> {
115                         if (isLockGranted.booleanValue()) {
116                             logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());
117                         } else {
118                             logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
119                                     this.jobGroup.getId());
120                         }
121                     }) //
122                     .filter(isLockGranted -> isLockGranted) //
123                     .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
124                     .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
125                             this.jobGroup.getId())) //
126                     .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
127                     .filter(this::isRopFile) //
128                     .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
129                     .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
130                     .map(this::createFakeEvent) //
131                     .flatMap(data -> TopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),
132                             dataStore), 100)
133                     .map(jobGroup::filter) //
134                     .map(this::gzip) //
135                     .flatMap(this::sendToClient, 1) //
136                     .onErrorResume(this::handleCollectHistoricalDataError) //
137                     .doFinally(sig -> sendLastStoredRecord()) //
138                     .subscribe();
139         }
140     }
141
142     private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config,
143             KafkaDeliveryInfo deliveryInfo) {
144
145         String bootstrapServers = deliveryInfo.getBootStrapServers();
146         if (bootstrapServers == null || bootstrapServers.isEmpty()) {
147             bootstrapServers = config.getKafkaBootStrapServers();
148         }
149
150         Map<String, Object> props = new HashMap<>();
151         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
152         props.put(ProducerConfig.ACKS_CONFIG, "all");
153         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
154         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
155
156         config.addKafkaSecurityProps(props);
157
158         return SenderOptions.create(props);
159     }
160
161     private void sendLastStoredRecord() {
162         String data = "{}";
163         FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes());
164
165         sendToClient(output).subscribe();
166     }
167
168     private FilteredData gzip(FilteredData data) {
169         if (this.applConfig.isZipOutput()) {
170             try {
171                 ByteArrayOutputStream out = new ByteArrayOutputStream();
172                 GZIPOutputStream gzip = new GZIPOutputStream(out);
173                 gzip.write(data.value);
174                 gzip.flush();
175                 gzip.close();
176                 byte[] zipped = out.toByteArray();
177                 return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true);
178             } catch (IOException e) {
179                 logger.error("Unexpected exception when zipping: {}", e.getMessage());
180                 return data;
181             }
182         } else {
183             return data;
184         }
185     }
186
187     private Mono<String> handleCollectHistoricalDataError(Throwable t) {
188         logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId());
189         return tryDeleteLockFile() //
190                 .map(bool -> "OK");
191     }
192
193     private String collectHistoricalDataLockName() {
194         return "collectHistoricalDataLock" + this.jobGroup.getId();
195     }
196
197     private DataFromTopic createFakeEvent(String fileName) {
198         NewFileEvent ev = new NewFileEvent(fileName);
199         return new DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes());
200     }
201
202     private static String fileTimePartFromRopFileName(String fileName) {
203         // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"
204         return fileName.substring(fileName.lastIndexOf("/") + 2);
205     }
206
207     private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
208         try {
209             OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName);
210             OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
211             boolean isMatch = fileStartTime.isAfter(startTime);
212             logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
213                     fileStartTime, startTime, isMatch);
214             return isMatch;
215         } catch (Exception e) {
216             logger.warn("Time parsing exception: {}", e.getMessage());
217             return false;
218         }
219     }
220
221     private boolean isRopFile(String fileName) {
222         return fileName.endsWith(".json") || fileName.endsWith(".json.gz");
223     }
224
225     private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
226         if (filter.getPmRopEndTime() == null) {
227             return true;
228         }
229         try {
230             OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName);
231             OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
232             boolean isMatch = fileEndTime.isBefore(endTime);
233             logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
234                     endTime, isMatch);
235             return isMatch;
236
237         } catch (Exception e) {
238             logger.warn("Time parsing exception: {}", e.getMessage());
239             return false;
240         }
241     }
242
243     private static OffsetDateTime getStartTimeFromFileName(String fileName) {
244         String fileTimePart = fileTimePartFromRopFileName(fileName);
245         // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
246         fileTimePart = fileTimePart.substring(0, 18);
247         return parseFileDate(fileTimePart);
248     }
249
250     private static OffsetDateTime getEndTimeFromFileName(String fileName) {
251         String fileTimePart = fileTimePartFromRopFileName(fileName);
252         // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
253         fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
254         return parseFileDate(fileTimePart);
255     }
256
257     private static OffsetDateTime parseFileDate(String timeStr) {
258         DateTimeFormatter startTimeFormatter =
259                 new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
260         return OffsetDateTime.parse(timeStr, startTimeFormatter);
261     }
262
263     private void handleExceptionInStream(Throwable t) {
264         logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId());
265     }
266
267     public Mono<String> sendToClient(FilteredData data) {
268
269         SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo());
270
271         logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
272                 this.getJobGroup().getDeliveryInfo());
273
274         return this.sender.send(Mono.just(senderRecord)) //
275                 .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) //
276                 .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
277                         t.getMessage())) //
278                 .onErrorResume(t -> Mono.empty()) //
279                 .collectList() //
280                 .map(x -> "ok");
281
282     }
283
284     public synchronized void stop() {
285         if (this.subscription != null) {
286             logger.debug("Stopped, job: {}", jobGroup.getId());
287             this.subscription.dispose();
288             this.subscription = null;
289         }
290         if (sender != null) {
291             sender.close();
292             sender = null;
293         }
294
295         tryDeleteLockFile().subscribe();
296     }
297
298     private Mono<Boolean> tryDeleteLockFile() {
299         return dataStore.deleteLock(collectHistoricalDataLockName()) //
300                 .doOnNext(res -> logger.debug("Removed lockfile {} {}", collectHistoricalDataLockName(), res))
301                 .onErrorResume(t -> Mono.just(false));
302     }
303
304     public synchronized boolean isRunning() {
305         return this.subscription != null;
306     }
307
308     private SenderRecord<byte[], byte[], Integer> senderRecord(FilteredData output, KafkaDeliveryInfo deliveryInfo) {
309         int correlationMetadata = 2;
310         var producerRecord =
311                 new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers());
312         return SenderRecord.create(producerRecord, correlationMetadata);
313     }
314
315     private Flux<FilteredData> filter(Flux<DataFromTopic> inputFlux, JobGroup jobGroup) {
316         return inputFlux.doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) //
317                 .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) //
318                 .map(jobGroup::filter) //
319                 .filter(f -> !f.isEmpty()) //
320                 .map(this::gzip) //
321                 .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) //
322                 .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) //
323         ; //
324     }
325
326     private Mono<String> handleError(Throwable t) {
327         logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId());
328         this.errorStats.handleException(t);
329         return Mono.empty(); // Ignore
330     }
331
332     private void handleSentOk(String data) {
333         this.errorStats.handleOkFromConsumer();
334     }
335
336 }