2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2023 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.oran.datafile.tasks;
19 import com.google.gson.Gson;
20 import com.google.gson.GsonBuilder;
22 import java.nio.file.Files;
23 import java.nio.file.Path;
24 import java.nio.file.Paths;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.HashMap;
30 import org.apache.kafka.clients.producer.ProducerConfig;
31 import org.apache.kafka.clients.producer.ProducerRecord;
32 import org.apache.kafka.common.header.Header;
33 import org.apache.kafka.common.header.internals.RecordHeader;
34 import org.apache.kafka.common.serialization.StringSerializer;
35 import org.oran.datafile.commons.Scheme;
36 import org.oran.datafile.configuration.AppConfig;
37 import org.oran.datafile.configuration.CertificateConfig;
38 import org.oran.datafile.datastore.DataStore;
39 import org.oran.datafile.datastore.DataStore.Bucket;
40 import org.oran.datafile.exceptions.DatafileTaskException;
41 import org.oran.datafile.http.HttpsClientConnectionManagerUtil;
42 import org.oran.datafile.model.Counters;
43 import org.oran.datafile.model.FileData;
44 import org.oran.datafile.model.FilePublishInformation;
45 import org.oran.datafile.model.FileReadyMessage;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Component;
50 import reactor.core.publisher.Flux;
51 import reactor.core.publisher.Mono;
52 import reactor.core.scheduler.Scheduler;
53 import reactor.core.scheduler.Schedulers;
54 import reactor.kafka.sender.KafkaSender;
55 import reactor.kafka.sender.SenderOptions;
56 import reactor.kafka.sender.SenderRecord;
57 import reactor.kafka.sender.SenderResult;
58 import reactor.util.retry.Retry;
61 * This implements the main flow of the data file collector. Fetch file ready
63 * message router, fetch new files from the PNF publish these in the data
67 public class CollectAndReportFiles {
69 private static Gson gson = new GsonBuilder() //
70 .disableHtmlEscaping() //
73 private static final long FILE_TRANSFER_MAX_RETRIES = 2;
74 private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
76 private static final Logger logger = LoggerFactory.getLogger(CollectAndReportFiles.class);
78 private final AppConfig appConfig;
80 private Counters counters = new Counters();
82 private final KafkaSender<String, String> kafkaSender;
84 private final DataStore dataStore;
87 * Constructor for task registration in Datafile Workflow.
89 * @param applicationConfiguration - application configuration
91 public CollectAndReportFiles(AppConfig applicationConfiguration) {
92 this.appConfig = applicationConfiguration;
93 this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
96 this.dataStore = DataStore.create(applicationConfiguration);
101 private void initCerts() {
103 CertificateConfig certificateConfig = appConfig.getCertificateConfiguration();
104 HttpsClientConnectionManagerUtil.setupOrUpdate(certificateConfig.keyCert, certificateConfig.keyPasswordPath,
105 certificateConfig.trustedCa, certificateConfig.trustedCaPasswordPath, true);
106 } catch (DatafileTaskException e) {
107 logger.error("Could not setup HttpsClient certs, reason: {}", e.getMessage());
112 * Main function for scheduling for the file collection Workflow.
114 public void start() {
118 private void start(int delayMillis) {
120 logger.trace("Starting");
121 if (appConfig.isS3Enabled()) {
122 this.dataStore.create(Bucket.FILES).subscribe();
123 this.dataStore.create(Bucket.LOCKS).subscribe();
125 Thread.sleep(delayMillis);
126 createMainTask().subscribe(null, s -> start(2000), null);
127 } catch (Exception e) {
128 logger.error("Unexpected exception: {}", e.toString(), e);
129 Thread.currentThread().interrupt();
133 Flux<FilePublishInformation> createMainTask() {
134 final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads();
135 Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads);
136 return fetchFromKafka() //
137 .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
138 .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
139 .parallel(noOfWorkerThreads) // Each FileReadyMessage in a separate thread
141 .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
142 .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
143 .flatMap(this::filterNotFetched, false, 1, 1) //
144 .flatMap(this::fetchFile, false, 1, 1) //
145 .flatMap(data -> reportFetchedFile(data, appConfig.getCollectedFileTopic()), false, 1) //
147 .doOnError(t -> logger.error("Received error: {}", t.toString())); //
150 private Mono<FileData> deleteLock(FileData info) {
151 return dataStore.deleteLock(lockName(info.name())).map(b -> info); //
154 private Mono<FilePublishInformation> moveFileToS3Bucket(FilePublishInformation info) {
155 if (this.appConfig.isS3Enabled()) {
156 return dataStore.copyFileTo(locaFilePath(info), info.getName())
157 .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
158 .retryWhen(Retry.backoff(4, Duration.ofMillis(1000))) //
160 .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
162 .doOnNext(n -> logger.debug("Stored file in S3: {}", info.getName())) //
163 .doOnNext(sig -> deleteLocalFile(info));
165 return Mono.just(info);
169 private Mono<FileData> filterNotFetched(FileData fileData) {
170 Path localPath = fileData.getLocalFilePath(this.appConfig);
172 return dataStore.fileExists(Bucket.FILES, fileData.name()) //
173 .filter(exists -> !exists) //
174 .filter(exists -> !localPath.toFile().exists()) //
175 .map(f -> fileData); //
179 private String lockName(String fileName) {
180 return fileName + ".lck";
183 private Path locaFilePath(FilePublishInformation info) {
184 return Paths.get(appConfig.getCollectedFilesPath(), info.getName());
187 private void deleteLocalFile(FilePublishInformation info) {
188 Path path = locaFilePath(info);
191 } catch (Exception e) {
192 logger.warn("Could not delete local file: {}, reason:{}", path, e.getMessage());
196 private Flux<FilePublishInformation> reportFetchedFile(FilePublishInformation fileData, String topic) {
197 String json = gson.toJson(fileData);
198 return sendDataToStream(topic, fileData.getSourceName(), json) //
199 .map(result -> fileData);
202 public Flux<SenderResult<Integer>> sendDataToStream(String topic, String sourceName, String value) {
203 return sendDataToKafkaStream(Flux.just(senderRecord(topic, sourceName, value)));
206 private SenderRecord<String, String, Integer> senderRecord(String topic, String sourceName, String value) {
207 int correlationMetadata = 2;
209 var producerRecord = new ProducerRecord<>(topic, null, null, key, value, kafkaHeaders(sourceName));
210 return SenderRecord.create(producerRecord, correlationMetadata);
213 private Iterable<Header> kafkaHeaders(String sourceName) {
214 ArrayList<Header> result = new ArrayList<>();
215 Header h = new RecordHeader("SourceName", sourceName.getBytes());
220 private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
222 return kafkaSender.send(dataToSend) //
223 .doOnError(e -> logger.error("Send to kafka failed", e));
226 private SenderOptions<String, String> kafkaSenderOptions() {
227 String bootstrapServers = this.appConfig.getKafkaBootStrapServers();
229 Map<String, Object> props = new HashMap<>();
230 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
231 props.put(ProducerConfig.ACKS_CONFIG, "all");
232 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
233 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
234 this.appConfig.addKafkaSecurityProps(props);
235 return SenderOptions.create(props);
238 public Counters getCounters() {
239 return this.counters;
242 protected FileCollector createFileCollector() {
243 return new FileCollector(appConfig, counters);
246 private Mono<FilePublishInformation> fetchFile(FileData fileData) {
247 return this.dataStore.createLock(lockName(fileData.name())).filter(granted -> granted) //
248 .map(granted -> createFileCollector()) //
249 .flatMap(collector -> collector.collectFile(fileData, FILE_TRANSFER_MAX_RETRIES,
250 FILE_TRANSFER_INITIAL_RETRY_TIMEOUT)) //
251 .flatMap(this::moveFileToS3Bucket) //
252 .doOnNext(b -> deleteLock(fileData).subscribe()) //
253 .doOnError(b -> deleteLock(fileData).subscribe()) //
254 .onErrorResume(exception -> handleFetchFileFailure(fileData, exception)); //
257 private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
258 Path localFilePath = fileData.getLocalFilePath(this.appConfig);
259 logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage());
260 deleteFile(localFilePath);
261 if (Scheme.isFtpScheme(fileData.scheme())) {
262 counters.incNoOfFailedFtp();
264 counters.incNoOfFailedHttp();
270 * Fetch more messages from the message router. This is done in a
271 * polling/blocking fashion.
273 private Flux<FileReadyMessage> fetchFromKafka() {
274 KafkaTopicListener listener = new KafkaTopicListener(this.appConfig);
275 return listener.getFlux() //
276 .flatMap(this::parseReceivedFileReadyMessage, 1);
280 Mono<FileReadyMessage> parseReceivedFileReadyMessage(KafkaTopicListener.DataFromTopic data) {
282 FileReadyMessage msg = gson.fromJson(data.value, FileReadyMessage.class);
283 logger.debug("Received: {}", msg);
284 return Mono.just(msg);
285 } catch (Exception e) {
286 logger.warn("Could not parse received: {}, reason: {}", data.value, e.getMessage());
291 private static void deleteFile(Path localFile) {
292 logger.trace("Deleting file: {}", localFile);
294 Files.delete(localFile);
295 } catch (Exception e) {
296 logger.trace("Could not delete file: {}, reason: {}", localFile, e.getMessage());