Code Review
/
nonrtric
/
plt
/
ranpm.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
CI: Add SonarCloud scan GHA workflow
[nonrtric/plt/ranpm.git]
/
datafilecollector
/
src
/
main
/
java
/
org
/
oran
/
datafile
/
tasks
/
CollectAndReportFiles.java
diff --git
a/datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java
b/datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java
index
a78d969
..
9ea9d57
100644
(file)
--- a/
datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java
+++ b/
datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java
@@
-42,6
+42,7
@@
import org.oran.datafile.model.Counters;
import org.oran.datafile.model.FileData;
import org.oran.datafile.model.FilePublishInformation;
import org.oran.datafile.model.FileReadyMessage;
import org.oran.datafile.model.FileData;
import org.oran.datafile.model.FilePublishInformation;
import org.oran.datafile.model.FileReadyMessage;
+import org.oran.datafile.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@
-82,14
+83,17
@@
public class CollectAndReportFiles {
private final DataStore dataStore;
private final DataStore dataStore;
+ private final SecurityContext securityContext;
+
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
*/
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
*/
- public CollectAndReportFiles(AppConfig applicationConfiguration) {
+ public CollectAndReportFiles(
SecurityContext securityContext,
AppConfig applicationConfiguration) {
this.appConfig = applicationConfiguration;
this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
this.appConfig = applicationConfiguration;
this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
+ this.securityContext = securityContext;
initCerts();
this.dataStore = DataStore.create(applicationConfiguration);
initCerts();
this.dataStore = DataStore.create(applicationConfiguration);
@@
-217,7
+221,6
@@
public class CollectAndReportFiles {
}
private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
}
private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
-
return kafkaSender.send(dataToSend) //
.doOnError(e -> logger.error("Send to kafka failed", e));
}
return kafkaSender.send(dataToSend) //
.doOnError(e -> logger.error("Send to kafka failed", e));
}
@@
-239,7
+242,7
@@
public class CollectAndReportFiles {
}
protected FileCollector createFileCollector() {
}
protected FileCollector createFileCollector() {
- return new FileCollector(appConfig, counters);
+ return new FileCollector(
securityContext,
appConfig, counters);
}
private Mono<FilePublishInformation> fetchFile(FileData fileData) {
}
private Mono<FilePublishInformation> fetchFile(FileData fileData) {
@@
-255,7
+258,8
@@
public class CollectAndReportFiles {
private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
Path localFilePath = fileData.getLocalFilePath(this.appConfig);
private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
Path localFilePath = fileData.getLocalFilePath(this.appConfig);
- logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage());
+ String remoteFilePath = fileData.remoteFilePath();
+ logger.error("File fetching failed, path {}, reason: {}", remoteFilePath, t.getMessage());
deleteFile(localFilePath);
if (FileData.Scheme.isFtpScheme(fileData.scheme())) {
counters.incNoOfFailedFtp();
deleteFile(localFilePath);
if (FileData.Scheme.isFtpScheme(fileData.scheme())) {
counters.incNoOfFailedFtp();