import org.oran.datafile.model.FileData;
import org.oran.datafile.model.FilePublishInformation;
import org.oran.datafile.model.FileReadyMessage;
+import org.oran.datafile.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
private final DataStore dataStore;
+ private final SecurityContext securityContext;
+
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
*/
- public CollectAndReportFiles(AppConfig applicationConfiguration) {
+ public CollectAndReportFiles(SecurityContext securityContext, AppConfig applicationConfiguration) {
this.appConfig = applicationConfiguration;
this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
+ this.securityContext = securityContext;
initCerts();
this.dataStore = DataStore.create(applicationConfiguration);
}
private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
-
return kafkaSender.send(dataToSend) //
.doOnError(e -> logger.error("Send to kafka failed", e));
}
}
protected FileCollector createFileCollector() {
- return new FileCollector(appConfig, counters);
+ return new FileCollector(securityContext, appConfig, counters);
}
private Mono<FilePublishInformation> fetchFile(FileData fileData) {