2 * ========================LICENSE_START=================================
5 * Copyright (C) 2023 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.pmproducer.tasks;
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.time.OffsetDateTime;
26 import java.time.format.DateTimeFormatter;
27 import java.time.format.DateTimeFormatterBuilder;
28 import java.util.HashMap;
30 import java.util.zip.GZIPOutputStream;
34 import org.apache.commons.lang3.StringUtils;
35 import org.apache.kafka.clients.producer.ProducerConfig;
36 import org.apache.kafka.clients.producer.ProducerRecord;
37 import org.apache.kafka.common.serialization.ByteArraySerializer;
38 import org.oran.pmproducer.configuration.ApplicationConfig;
39 import org.oran.pmproducer.datastore.DataStore;
40 import org.oran.pmproducer.filter.FilteredData;
41 import org.oran.pmproducer.filter.PmReportFilter;
42 import org.oran.pmproducer.repository.Job.Parameters.KafkaDeliveryInfo;
43 import org.oran.pmproducer.repository.Jobs.JobGroup;
44 import org.oran.pmproducer.tasks.TopicListener.DataFromTopic;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import reactor.core.Disposable;
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.kafka.sender.KafkaSender;
52 import reactor.kafka.sender.SenderOptions;
53 import reactor.kafka.sender.SenderRecord;
56 * The class streams data from a multi cast sink and sends the data to the Job
57 * owner via REST calls.
59 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
60 public class JobDataDistributor {
61 private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
64 private final JobGroup jobGroup;
65 private Disposable subscription;
66 private final ErrorStats errorStats = new ErrorStats();
68 private final DataStore dataStore;
69 private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
70 private final ApplicationConfig applConfig;
72 private KafkaSender<byte[], byte[]> sender;
74 private class ErrorStats {
76 private int consumerFaultCounter = 0;
78 public void handleOkFromConsumer() {
79 this.consumerFaultCounter = 0;
82 public void handleException(Throwable t) {
83 ++this.consumerFaultCounter;
87 public JobDataDistributor(JobGroup jobGroup, ApplicationConfig applConfig) {
88 this.applConfig = applConfig;
89 this.jobGroup = jobGroup;
90 this.dataStore = DataStore.create(applConfig);
91 this.dataStore.create(DataStore.Bucket.FILES).subscribe();
92 this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
94 SenderOptions<byte[], byte[]> senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo());
95 this.sender = KafkaSender.create(senderOptions);
99 public void start(Flux<TopicListener.DataFromTopic> input) {
100 logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
101 PmReportFilter filter = jobGroup.getFilter();
102 if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
103 this.subscription = filter(input, this.jobGroup) //
104 .flatMap(this::sendToClient) //
105 .onErrorResume(this::handleError) //
106 .subscribe(this::handleSentOk, //
107 this::handleExceptionInStream, //
108 () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));
111 if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
112 this.dataStore.createLock(collectHistoricalDataLockName()) //
113 .doOnNext(isLockGranted -> {
114 if (isLockGranted.booleanValue()) {
115 logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());
117 logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
118 this.jobGroup.getId());
121 .filter(isLockGranted -> isLockGranted) //
122 .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
123 .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
124 this.jobGroup.getId())) //
125 .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
126 .filter(this::isRopFile) //
127 .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
128 .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
129 .map(this::createFakeEvent) //
130 .flatMap(data -> TopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),
132 .map(jobGroup::filter) //
134 .flatMap(this::sendToClient, 1) //
135 .onErrorResume(this::handleCollectHistoricalDataError) //
136 .doFinally(sig -> sendLastStoredRecord()) //
141 private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config,
142 KafkaDeliveryInfo deliveryInfo) {
144 String bootstrapServers = deliveryInfo.getBootStrapServers();
145 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
146 bootstrapServers = config.getKafkaBootStrapServers();
149 Map<String, Object> props = new HashMap<>();
150 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
151 props.put(ProducerConfig.ACKS_CONFIG, "all");
152 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
153 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
155 config.addKafkaSecurityProps(props);
157 return SenderOptions.create(props);
160 private void sendLastStoredRecord() {
162 FilteredData output = new FilteredData("", this.jobGroup.getType().getId(), null, data.getBytes());
164 sendToClient(output).subscribe();
167 private FilteredData gzip(FilteredData data) {
168 if (this.applConfig.isZipOutput()) {
170 ByteArrayOutputStream out = new ByteArrayOutputStream();
171 GZIPOutputStream gzip = new GZIPOutputStream(out);
172 gzip.write(data.value);
175 byte[] zipped = out.toByteArray();
176 return new FilteredData(data.getSourceName(), data.infoTypeId, data.key, zipped, true);
177 } catch (IOException e) {
178 logger.error("Unexpected exception when zipping: {}", e.getMessage());
186 private Mono<String> handleCollectHistoricalDataError(Throwable t) {
187 logger.error("Exception: {} job: {}", t.getMessage(), jobGroup.getId());
188 return tryDeleteLockFile() //
192 private String collectHistoricalDataLockName() {
193 return "collectHistoricalDataLock" + this.jobGroup.getId();
196 private DataFromTopic createFakeEvent(String fileName) {
197 NewFileEvent ev = new NewFileEvent(fileName);
198 return new DataFromTopic(this.jobGroup.getType().getId(), null, null, gson.toJson(ev).getBytes());
201 private static String fileTimePartFromRopFileName(String fileName) {
202 // "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"
203 return fileName.substring(fileName.lastIndexOf("/") + 2);
206 private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
208 OffsetDateTime fileStartTime = getStartTimeFromFileName(fileName);
209 OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
210 boolean isMatch = fileStartTime.isAfter(startTime);
211 logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
212 fileStartTime, startTime, isMatch);
214 } catch (Exception e) {
215 logger.warn("Time parsing exception: {}", e.getMessage());
220 private boolean isRopFile(String fileName) {
221 return fileName.endsWith(".json") || fileName.endsWith(".json.gz");
224 private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
225 if (filter.getPmRopEndTime() == null) {
229 OffsetDateTime fileEndTime = getEndTimeFromFileName(fileName);
230 OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
231 boolean isMatch = fileEndTime.isBefore(endTime);
232 logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
236 } catch (Exception e) {
237 logger.warn("Time parsing exception: {}", e.getMessage());
242 private static OffsetDateTime getStartTimeFromFileName(String fileName) {
243 String fileTimePart = fileTimePartFromRopFileName(fileName);
244 // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
245 fileTimePart = fileTimePart.substring(0, 18);
246 return parseFileDate(fileTimePart);
249 private static OffsetDateTime getEndTimeFromFileName(String fileName) {
250 String fileTimePart = fileTimePartFromRopFileName(fileName);
251 // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
252 fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
253 return parseFileDate(fileTimePart);
256 private static OffsetDateTime parseFileDate(String timeStr) {
257 DateTimeFormatter startTimeFormatter =
258 new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
259 return OffsetDateTime.parse(timeStr, startTimeFormatter);
262 private void handleExceptionInStream(Throwable t) {
263 logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), jobGroup.getId());
266 public Mono<String> sendToClient(FilteredData data) {
268 SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo());
270 logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
271 this.getJobGroup().getDeliveryInfo());
273 return this.sender.send(Mono.just(senderRecord)) //
274 .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) //
275 .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
277 .onErrorResume(t -> Mono.empty()) //
283 public synchronized void stop() {
284 if (this.subscription != null) {
285 logger.debug("Stopped, job: {}", jobGroup.getId());
286 this.subscription.dispose();
287 this.subscription = null;
289 if (sender != null) {
294 tryDeleteLockFile().subscribe();
297 private Mono<Boolean> tryDeleteLockFile() {
298 return dataStore.deleteLock(collectHistoricalDataLockName()) //
299 .doOnNext(res -> logger.debug("Removed lockfile {} {}", collectHistoricalDataLockName(), res))
300 .onErrorResume(t -> Mono.just(false));
303 public synchronized boolean isRunning() {
304 return this.subscription != null;
307 private SenderRecord<byte[], byte[], Integer> senderRecord(FilteredData output, KafkaDeliveryInfo deliveryInfo) {
308 int correlationMetadata = 2;
310 new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers());
311 return SenderRecord.create(producerRecord, correlationMetadata);
314 private Flux<FilteredData> filter(Flux<DataFromTopic> inputFlux, JobGroup jobGroup) {
315 return inputFlux.doOnNext(data -> logger.trace("Received data, job {}", jobGroup.getId())) //
316 .doOnNext(data -> jobGroup.getJobs().forEach(job -> job.getStatistics().received(data.value))) //
317 .map(jobGroup::filter) //
318 .filter(f -> !f.isEmpty()) //
320 .doOnNext(f -> jobGroup.getJobs().forEach(job -> job.getStatistics().filtered(f.value))) //
321 .doOnNext(data -> logger.trace("Filtered data, job {}", jobGroup.getId())) //
325 private Mono<String> handleError(Throwable t) {
326 logger.warn("exception: {} job: {}", t.getMessage(), jobGroup.getId());
327 this.errorStats.handleException(t);
328 return Mono.empty(); // Ignore
331 private void handleSentOk(String data) {
332 this.errorStats.handleOkFromConsumer();