c12794865c464031bee0f9bafecf2aa90e4b22d7
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / onap / dcaegen2 / collectors / datafile / tasks / CollectAndReportFiles.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import com.google.gson.Gson;
20 import com.google.gson.GsonBuilder;
21
22 import java.nio.file.Files;
23 import java.nio.file.Path;
24 import java.nio.file.Paths;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.Map;
29
30 import org.apache.kafka.clients.producer.ProducerConfig;
31 import org.apache.kafka.clients.producer.ProducerRecord;
32 import org.apache.kafka.common.header.Header;
33 import org.apache.kafka.common.header.internals.RecordHeader;
34 import org.apache.kafka.common.serialization.StringSerializer;
35 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
36 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
37 import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
38 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
39 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
40 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
41 import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
42 import org.onap.dcaegen2.collectors.datafile.model.Counters;
43 import org.onap.dcaegen2.collectors.datafile.model.FileData;
44 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
45 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.stereotype.Component;
50
51 import reactor.core.publisher.Flux;
52 import reactor.core.publisher.Mono;
53 import reactor.core.scheduler.Scheduler;
54 import reactor.core.scheduler.Schedulers;
55 import reactor.kafka.sender.KafkaSender;
56 import reactor.kafka.sender.SenderOptions;
57 import reactor.kafka.sender.SenderRecord;
58 import reactor.kafka.sender.SenderResult;
59 import reactor.util.retry.Retry;
60
61 /**
62  * This implements the main flow of the data file collector. Fetch file ready
63  * events from the
64  * message router, fetch new files from the PNF publish these in the data
65  * router.
66  */
67 @Component
68 public class CollectAndReportFiles {
69
70     private static Gson gson = new GsonBuilder() //
71         .disableHtmlEscaping() //
72         .create(); //
73
74     private static final int NUMBER_OF_WORKER_THREADS = 200;
75     private static final long FILE_TRANSFER_MAX_RETRIES = 2;
76     private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
77
78     private static final Logger logger = LoggerFactory.getLogger(CollectAndReportFiles.class);
79
80     private final AppConfig appConfig;
81
82     private Counters counters = new Counters();
83
84     private final KafkaSender<String, String> kafkaSender;
85
86     private final DataStore dataStore;
87
88     /**
89      * Constructor for task registration in Datafile Workflow.
90      *
91      * @param applicationConfiguration - application configuration
92      */
93     @Autowired
94     public CollectAndReportFiles(AppConfig applicationConfiguration) {
95         this.appConfig = applicationConfiguration;
96         this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
97         initCerts();
98
99         this.dataStore = DataStore.create(applicationConfiguration);
100
101         start();
102     }
103
104     private void initCerts() {
105         try {
106             CertificateConfig certificateConfig = appConfig.getCertificateConfiguration();
107             HttpsClientConnectionManagerUtil.setupOrUpdate(certificateConfig.keyCert, certificateConfig.keyPasswordPath,
108                 certificateConfig.trustedCa, certificateConfig.trustedCaPasswordPath, true);
109         } catch (DatafileTaskException e) {
110             logger.error("Could not setup HttpsClient certs, reason: {}", e.getMessage());
111         }
112     }
113
114     /**
115      * Main function for scheduling for the file collection Workflow.
116      */
117     public void start() {
118         start(0);
119     }
120
121     private void start(int delayMillis) {
122         try {
123             logger.trace("Starting");
124             if (appConfig.isS3Enabled()) {
125                 this.dataStore.create(Bucket.FILES).subscribe();
126                 this.dataStore.create(Bucket.LOCKS).subscribe();
127             }
128             Thread.sleep(delayMillis);
129             createMainTask().subscribe(null, s -> start(2000), null);
130         } catch (Exception e) {
131             logger.error("Unexpected exception: {}", e.toString(), e);
132             Thread.currentThread().interrupt();
133         }
134     }
135
136     Flux<FilePublishInformation> createMainTask() {
137         Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
138         return fetchFromKafka() //
139             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
140             .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
141             .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
142             .runOn(scheduler) //
143             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
144             .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
145             .flatMap(this::filterNotFetched, false, 1, 1) //
146             .flatMap(this::fetchFile, false, 1, 1) //
147             .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
148             .sequential() //
149             .doOnError(t -> logger.error("Received error: {}", t.toString())); //
150     }
151
152     private Mono<FileData> deleteLock(FileData info) {
153         return dataStore.deleteLock(lockName(info.name())).map(b -> info); //
154     }
155
156     private Mono<FilePublishInformation> moveFileToS3Bucket(FilePublishInformation info) {
157         if (this.appConfig.isS3Enabled()) {
158             return dataStore.copyFileTo(locaFilePath(info), info.getName())
159                 .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
160                 .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
161                 .map(f -> info) //
162                 .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
163                     t.getMessage())) //
164                 .doOnNext(n -> logger.debug("Stored file in S3: {}", info.getName())) //
165                 .doOnNext(sig -> deleteLocalFile(info));
166         } else {
167             return Mono.just(info);
168         }
169     }
170
171     private Mono<FileData> filterNotFetched(FileData fileData) {
172         Path localPath = fileData.getLocalFilePath(this.appConfig);
173
174         return dataStore.fileExists(Bucket.FILES, fileData.name()) //
175             .filter(exists -> !exists) //
176             .filter(exists -> !localPath.toFile().exists()) //
177             .map(f -> fileData); //
178
179     }
180
181     private String lockName(String fileName) {
182         return fileName + ".lck";
183     }
184
185     private Path locaFilePath(FilePublishInformation info) {
186         return Paths.get(this.appConfig.collectedFilesPath, info.getName());
187     }
188
189     private void deleteLocalFile(FilePublishInformation info) {
190         Path path = locaFilePath(info);
191         try {
192             Files.delete(path);
193         } catch (Exception e) {
194             logger.warn("Could not delete local file: {}, reason:{}", path, e.getMessage());
195         }
196     }
197
198     private Flux<FilePublishInformation> reportFetchedFile(FilePublishInformation fileData, String topic) {
199         String json = gson.toJson(fileData);
200         return sendDataToStream(topic, fileData.getSourceName(), json) //
201             .map(result -> fileData);
202     }
203
204     public Flux<SenderResult<Integer>> sendDataToStream(String topic, String sourceName, String value) {
205         return sendDataToKafkaStream(Flux.just(senderRecord(topic, sourceName, value)));
206     }
207
208     private SenderRecord<String, String, Integer> senderRecord(String topic, String sourceName, String value) {
209         int correlationMetadata = 2;
210         String key = null;
211         var producerRecord = new ProducerRecord<>(topic, null, null, key, value, kafkaHeaders(sourceName));
212         return SenderRecord.create(producerRecord, correlationMetadata);
213     }
214
215     private Iterable<Header> kafkaHeaders(String sourceName) {
216         ArrayList<Header> result = new ArrayList<>();
217         Header h = new RecordHeader("SourceName", sourceName.getBytes());
218         result.add(h);
219         return result;
220     }
221
222     private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
223
224         return kafkaSender.send(dataToSend) //
225             .doOnError(e -> logger.error("Send to kafka failed", e));
226     }
227
228     private SenderOptions<String, String> kafkaSenderOptions() {
229         String bootstrapServers = this.appConfig.getKafkaBootStrapServers();
230
231         Map<String, Object> props = new HashMap<>();
232         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
233         props.put(ProducerConfig.ACKS_CONFIG, "all");
234         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
235         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
236         return SenderOptions.create(props);
237     }
238
239     public Counters getCounters() {
240         return this.counters;
241     }
242
243     protected FileCollector createFileCollector() {
244         return new FileCollector(appConfig, counters);
245     }
246
247     private Mono<FilePublishInformation> fetchFile(FileData fileData) {
248         return this.dataStore.createLock(lockName(fileData.name())).filter(granted -> granted) //
249             .map(granted -> createFileCollector()) //
250             .flatMap(collector -> collector.collectFile(fileData, FILE_TRANSFER_MAX_RETRIES,
251                 FILE_TRANSFER_INITIAL_RETRY_TIMEOUT)) //
252             .flatMap(this::moveFileToS3Bucket) //
253             .doOnNext(b -> deleteLock(fileData).subscribe()) //
254             .doOnError(b -> deleteLock(fileData).subscribe()) //
255             .onErrorResume(exception -> handleFetchFileFailure(fileData, exception)); //
256     }
257
258     private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
259         Path localFilePath = fileData.getLocalFilePath(this.appConfig);
260         logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage());
261         deleteFile(localFilePath);
262         if (Scheme.isFtpScheme(fileData.scheme())) {
263             counters.incNoOfFailedFtp();
264         } else {
265             counters.incNoOfFailedHttp();
266         }
267         return Mono.empty();
268     }
269
270     /**
271      * Fetch more messages from the message router. This is done in a
272      * polling/blocking fashion.
273      */
274     private Flux<FileReadyMessage> fetchFromKafka() {
275         KafkaTopicListener listener = new KafkaTopicListener(this.appConfig.getKafkaBootStrapServers(),
276             this.appConfig.kafkaClientId, this.appConfig.fileReadyEventTopic);
277         return listener.getFlux() //
278             .flatMap(this::parseReceivedFileReadyMessage, 1);
279
280     }
281
282     Mono<FileReadyMessage> parseReceivedFileReadyMessage(KafkaTopicListener.DataFromTopic data) {
283         try {
284             FileReadyMessage msg = gson.fromJson(data.value, FileReadyMessage.class);
285             logger.debug("Received: {}", msg);
286             return Mono.just(msg);
287         } catch (Exception e) {
288             logger.warn("Could not parse received: {}, reason: {}", data.value, e.getMessage());
289             return Mono.empty();
290         }
291     }
292
293     private static void deleteFile(Path localFile) {
294         logger.trace("Deleting file: {}", localFile);
295         try {
296             Files.delete(localFile);
297         } catch (Exception e) {
298             logger.trace("Could not delete file: {}, reason: {}", localFile, e.getMessage());
299         }
300     }
301 }