import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.oran.datafile.commons.Scheme;
import org.oran.datafile.configuration.AppConfig;
import org.oran.datafile.configuration.CertificateConfig;
import org.oran.datafile.datastore.DataStore;
import org.oran.datafile.model.FileData;
import org.oran.datafile.model.FilePublishInformation;
import org.oran.datafile.model.FileReadyMessage;
+import org.oran.datafile.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
private final DataStore dataStore;
+ private final SecurityContext securityContext;
+
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
*/
- public CollectAndReportFiles(AppConfig applicationConfiguration) {
+ public CollectAndReportFiles(SecurityContext securityContext, AppConfig applicationConfiguration) {
this.appConfig = applicationConfiguration;
this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
+ this.securityContext = securityContext;
initCerts();
this.dataStore = DataStore.create(applicationConfiguration);
}
private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
-
return kafkaSender.send(dataToSend) //
.doOnError(e -> logger.error("Send to kafka failed", e));
}
}
protected FileCollector createFileCollector() {
- return new FileCollector(appConfig, counters);
+ return new FileCollector(securityContext, appConfig, counters);
}
private Mono<FilePublishInformation> fetchFile(FileData fileData) {
private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
Path localFilePath = fileData.getLocalFilePath(this.appConfig);
- logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage());
+ String remoteFilePath = fileData.remoteFilePath();
+ logger.error("File fetching failed, path {}, reason: {}", remoteFilePath, t.getMessage());
deleteFile(localFilePath);
- if (Scheme.isFtpScheme(fileData.scheme())) {
+ if (FileData.Scheme.isFtpScheme(fileData.scheme())) {
counters.incNoOfFailedFtp();
} else {
counters.incNoOfFailedHttp();