X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=datafilecollector%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdatafile%2Ftasks%2FCollectAndReportFiles.java;h=9ea9d57c94d66a824646c4a8cf4bf0cf4b0c1991;hb=HEAD;hp=a78d9695a5e703a3661c802f782b3bacaec81ff5;hpb=d806dbef4f43bb9c631c818e96c1a39e440c5e6c;p=nonrtric%2Fplt%2Franpm.git diff --git a/datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java b/datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java index a78d969..9ea9d57 100644 --- a/datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java +++ b/datafilecollector/src/main/java/org/oran/datafile/tasks/CollectAndReportFiles.java @@ -42,6 +42,7 @@ import org.oran.datafile.model.Counters; import org.oran.datafile.model.FileData; import org.oran.datafile.model.FilePublishInformation; import org.oran.datafile.model.FileReadyMessage; +import org.oran.datafile.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -82,14 +83,17 @@ public class CollectAndReportFiles { private final DataStore dataStore; + private final SecurityContext securityContext; + /** * Constructor for task registration in Datafile Workflow. * * @param applicationConfiguration - application configuration */ - public CollectAndReportFiles(AppConfig applicationConfiguration) { + public CollectAndReportFiles(SecurityContext securityContext, AppConfig applicationConfiguration) { this.appConfig = applicationConfiguration; this.kafkaSender = KafkaSender.create(kafkaSenderOptions()); + this.securityContext = securityContext; initCerts(); this.dataStore = DataStore.create(applicationConfiguration); @@ -217,7 +221,6 @@ public class CollectAndReportFiles { } private Flux> sendDataToKafkaStream(Flux> dataToSend) { - return kafkaSender.send(dataToSend) // .doOnError(e -> logger.error("Send to kafka failed", e)); } @@ -239,7 +242,7 @@ public class CollectAndReportFiles { } protected FileCollector createFileCollector() { - return new FileCollector(appConfig, counters); + return new FileCollector(securityContext, appConfig, counters); } private Mono fetchFile(FileData fileData) { @@ -255,7 +258,8 @@ public class CollectAndReportFiles { private Mono handleFetchFileFailure(FileData fileData, Throwable t) { Path localFilePath = fileData.getLocalFilePath(this.appConfig); - logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage()); + String remoteFilePath = fileData.remoteFilePath(); + logger.error("File fetching failed, path {}, reason: {}", remoteFilePath, t.getMessage()); deleteFile(localFilePath); if (FileData.Scheme.isFtpScheme(fileData.scheme())) { counters.incNoOfFailedFtp();