DFC shall provide a bearer authorization token in HTTP
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / tasks / CollectAndReportFiles.java
index a78d969..93b9a71 100644 (file)
@@ -42,6 +42,7 @@ import org.oran.datafile.model.Counters;
 import org.oran.datafile.model.FileData;
 import org.oran.datafile.model.FilePublishInformation;
 import org.oran.datafile.model.FileReadyMessage;
+import org.oran.datafile.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -82,14 +83,17 @@ public class CollectAndReportFiles {
 
     private final DataStore dataStore;
 
+    private final SecurityContext securityContext;
+
     /**
      * Constructor for task registration in Datafile Workflow.
      *
      * @param applicationConfiguration - application configuration
      */
-    public CollectAndReportFiles(AppConfig applicationConfiguration) {
+    public CollectAndReportFiles(SecurityContext securityContext, AppConfig applicationConfiguration) {
         this.appConfig = applicationConfiguration;
         this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
+        this.securityContext = securityContext;
         initCerts();
 
         this.dataStore = DataStore.create(applicationConfiguration);
@@ -217,7 +221,6 @@ public class CollectAndReportFiles {
     }
 
     private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
-
         return kafkaSender.send(dataToSend) //
             .doOnError(e -> logger.error("Send to kafka failed", e));
     }
@@ -239,7 +242,7 @@ public class CollectAndReportFiles {
     }
 
     protected FileCollector createFileCollector() {
-        return new FileCollector(appConfig, counters);
+        return new FileCollector(securityContext, appConfig, counters);
     }
 
     private Mono<FilePublishInformation> fetchFile(FileData fileData) {