Some tidying up
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / onap / dcaegen2 / collectors / datafile / tasks / CollectAndReportFiles.java
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import com.google.gson.Gson;
20 import com.google.gson.GsonBuilder;
21
22 import java.nio.file.Files;
23 import java.nio.file.Path;
24 import java.nio.file.Paths;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.Map;
29
30 import org.apache.kafka.clients.producer.ProducerConfig;
31 import org.apache.kafka.clients.producer.ProducerRecord;
32 import org.apache.kafka.common.header.Header;
33 import org.apache.kafka.common.header.internals.RecordHeader;
34 import org.apache.kafka.common.serialization.StringSerializer;
35 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
36 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
37 import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
38 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
39 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
40 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
41 import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
42 import org.onap.dcaegen2.collectors.datafile.model.Counters;
43 import org.onap.dcaegen2.collectors.datafile.model.FileData;
44 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
45 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Component;
49
50 import reactor.core.publisher.Flux;
51 import reactor.core.publisher.Mono;
52 import reactor.core.scheduler.Scheduler;
53 import reactor.core.scheduler.Schedulers;
54 import reactor.kafka.sender.KafkaSender;
55 import reactor.kafka.sender.SenderOptions;
56 import reactor.kafka.sender.SenderRecord;
57 import reactor.kafka.sender.SenderResult;
58 import reactor.util.retry.Retry;
59
60 /**
61  * This implements the main flow of the data file collector. Fetch file ready
62  * events from the
63  * message router, fetch new files from the PNF publish these in the data
64  * router.
65  */
66 @Component
67 public class CollectAndReportFiles {
68
69     private static Gson gson = new GsonBuilder() //
70         .disableHtmlEscaping() //
71         .create(); //
72
73     private static final int NUMBER_OF_WORKER_THREADS = 200;
74     private static final long FILE_TRANSFER_MAX_RETRIES = 2;
75     private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
76
77     private static final Logger logger = LoggerFactory.getLogger(CollectAndReportFiles.class);
78
79     private final AppConfig appConfig;
80
81     private Counters counters = new Counters();
82
83     private final KafkaSender<String, String> kafkaSender;
84
85     private final DataStore dataStore;
86
87     /**
88      * Constructor for task registration in Datafile Workflow.
89      *
90      * @param applicationConfiguration - application configuration
91      */
92     public CollectAndReportFiles(AppConfig applicationConfiguration) {
93         this.appConfig = applicationConfiguration;
94         this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
95         initCerts();
96
97         this.dataStore = DataStore.create(applicationConfiguration);
98
99         start();
100     }
101
102     private void initCerts() {
103         try {
104             CertificateConfig certificateConfig = appConfig.getCertificateConfiguration();
105             HttpsClientConnectionManagerUtil.setupOrUpdate(certificateConfig.keyCert, certificateConfig.keyPasswordPath,
106                 certificateConfig.trustedCa, certificateConfig.trustedCaPasswordPath, true);
107         } catch (DatafileTaskException e) {
108             logger.error("Could not setup HttpsClient certs, reason: {}", e.getMessage());
109         }
110     }
111
112     /**
113      * Main function for scheduling for the file collection Workflow.
114      */
115     public void start() {
116         start(0);
117     }
118
119     private void start(int delayMillis) {
120         try {
121             logger.trace("Starting");
122             if (appConfig.isS3Enabled()) {
123                 this.dataStore.create(Bucket.FILES).subscribe();
124                 this.dataStore.create(Bucket.LOCKS).subscribe();
125             }
126             Thread.sleep(delayMillis);
127             createMainTask().subscribe(null, s -> start(2000), null);
128         } catch (Exception e) {
129             logger.error("Unexpected exception: {}", e.toString(), e);
130             Thread.currentThread().interrupt();
131         }
132     }
133
134     Flux<FilePublishInformation> createMainTask() {
135         Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
136         return fetchFromKafka() //
137             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
138             .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
139             .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
140             .runOn(scheduler) //
141             .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
142             .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
143             .flatMap(this::filterNotFetched, false, 1, 1) //
144             .flatMap(this::fetchFile, false, 1, 1) //
145             .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
146             .sequential() //
147             .doOnError(t -> logger.error("Received error: {}", t.toString())); //
148     }
149
150     private Mono<FileData> deleteLock(FileData info) {
151         return dataStore.deleteLock(lockName(info.name())).map(b -> info); //
152     }
153
154     private Mono<FilePublishInformation> moveFileToS3Bucket(FilePublishInformation info) {
155         if (this.appConfig.isS3Enabled()) {
156             return dataStore.copyFileTo(locaFilePath(info), info.getName())
157                 .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
158                 .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
159                 .map(f -> info) //
160                 .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
161                     t.getMessage())) //
162                 .doOnNext(n -> logger.debug("Stored file in S3: {}", info.getName())) //
163                 .doOnNext(sig -> deleteLocalFile(info));
164         } else {
165             return Mono.just(info);
166         }
167     }
168
169     private Mono<FileData> filterNotFetched(FileData fileData) {
170         Path localPath = fileData.getLocalFilePath(this.appConfig);
171
172         return dataStore.fileExists(Bucket.FILES, fileData.name()) //
173             .filter(exists -> !exists) //
174             .filter(exists -> !localPath.toFile().exists()) //
175             .map(f -> fileData); //
176
177     }
178
179     private String lockName(String fileName) {
180         return fileName + ".lck";
181     }
182
183     private Path locaFilePath(FilePublishInformation info) {
184         return Paths.get(this.appConfig.collectedFilesPath, info.getName());
185     }
186
187     private void deleteLocalFile(FilePublishInformation info) {
188         Path path = locaFilePath(info);
189         try {
190             Files.delete(path);
191         } catch (Exception e) {
192             logger.warn("Could not delete local file: {}, reason:{}", path, e.getMessage());
193         }
194     }
195
196     private Flux<FilePublishInformation> reportFetchedFile(FilePublishInformation fileData, String topic) {
197         String json = gson.toJson(fileData);
198         return sendDataToStream(topic, fileData.getSourceName(), json) //
199             .map(result -> fileData);
200     }
201
202     public Flux<SenderResult<Integer>> sendDataToStream(String topic, String sourceName, String value) {
203         return sendDataToKafkaStream(Flux.just(senderRecord(topic, sourceName, value)));
204     }
205
206     private SenderRecord<String, String, Integer> senderRecord(String topic, String sourceName, String value) {
207         int correlationMetadata = 2;
208         String key = null;
209         var producerRecord = new ProducerRecord<>(topic, null, null, key, value, kafkaHeaders(sourceName));
210         return SenderRecord.create(producerRecord, correlationMetadata);
211     }
212
213     private Iterable<Header> kafkaHeaders(String sourceName) {
214         ArrayList<Header> result = new ArrayList<>();
215         Header h = new RecordHeader("SourceName", sourceName.getBytes());
216         result.add(h);
217         return result;
218     }
219
220     private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
221
222         return kafkaSender.send(dataToSend) //
223             .doOnError(e -> logger.error("Send to kafka failed", e));
224     }
225
226     private SenderOptions<String, String> kafkaSenderOptions() {
227         String bootstrapServers = this.appConfig.getKafkaBootStrapServers();
228
229         Map<String, Object> props = new HashMap<>();
230         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
231         props.put(ProducerConfig.ACKS_CONFIG, "all");
232         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
233         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
234         return SenderOptions.create(props);
235     }
236
237     public Counters getCounters() {
238         return this.counters;
239     }
240
241     protected FileCollector createFileCollector() {
242         return new FileCollector(appConfig, counters);
243     }
244
245     private Mono<FilePublishInformation> fetchFile(FileData fileData) {
246         return this.dataStore.createLock(lockName(fileData.name())).filter(granted -> granted) //
247             .map(granted -> createFileCollector()) //
248             .flatMap(collector -> collector.collectFile(fileData, FILE_TRANSFER_MAX_RETRIES,
249                 FILE_TRANSFER_INITIAL_RETRY_TIMEOUT)) //
250             .flatMap(this::moveFileToS3Bucket) //
251             .doOnNext(b -> deleteLock(fileData).subscribe()) //
252             .doOnError(b -> deleteLock(fileData).subscribe()) //
253             .onErrorResume(exception -> handleFetchFileFailure(fileData, exception)); //
254     }
255
256     private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
257         Path localFilePath = fileData.getLocalFilePath(this.appConfig);
258         logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage());
259         deleteFile(localFilePath);
260         if (Scheme.isFtpScheme(fileData.scheme())) {
261             counters.incNoOfFailedFtp();
262         } else {
263             counters.incNoOfFailedHttp();
264         }
265         return Mono.empty();
266     }
267
268     /**
269      * Fetch more messages from the message router. This is done in a
270      * polling/blocking fashion.
271      */
272     private Flux<FileReadyMessage> fetchFromKafka() {
273         KafkaTopicListener listener = new KafkaTopicListener(this.appConfig.getKafkaBootStrapServers(),
274             this.appConfig.kafkaClientId, this.appConfig.fileReadyEventTopic);
275         return listener.getFlux() //
276             .flatMap(this::parseReceivedFileReadyMessage, 1);
277
278     }
279
280     Mono<FileReadyMessage> parseReceivedFileReadyMessage(KafkaTopicListener.DataFromTopic data) {
281         try {
282             FileReadyMessage msg = gson.fromJson(data.value, FileReadyMessage.class);
283             logger.debug("Received: {}", msg);
284             return Mono.just(msg);
285         } catch (Exception e) {
286             logger.warn("Could not parse received: {}, reason: {}", data.value, e.getMessage());
287             return Mono.empty();
288         }
289     }
290
291     private static void deleteFile(Path localFile) {
292         logger.trace("Deleting file: {}", localFile);
293         try {
294             Files.delete(localFile);
295         } catch (Exception e) {
296             logger.trace("Could not delete file: {}, reason: {}", localFile, e.getMessage());
297         }
298     }
299 }