Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / tasks / CollectAndReportFiles.java
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2023 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -14,7 +14,7 @@
  * ============LICENSE_END========================================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -32,17 +32,17 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.oran.datafile.commons.Scheme;
+import org.oran.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.CertificateConfig;
+import org.oran.datafile.datastore.DataStore;
+import org.oran.datafile.datastore.DataStore.Bucket;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.http.HttpsClientConnectionManagerUtil;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.model.FileData;
+import org.oran.datafile.model.FilePublishInformation;
+import org.oran.datafile.model.FileReadyMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -142,7 +142,7 @@ public class CollectAndReportFiles {
             .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
             .flatMap(this::filterNotFetched, false, 1, 1) //
             .flatMap(this::fetchFile, false, 1, 1) //
-            .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
+            .flatMap(data -> reportFetchedFile(data, appConfig.getCollectedFileTopic()), false, 1) //
             .sequential() //
             .doOnError(t -> logger.error("Received error: {}", t.toString())); //
     }
@@ -181,7 +181,7 @@ public class CollectAndReportFiles {
     }
 
     private Path locaFilePath(FilePublishInformation info) {
-        return Paths.get(this.appConfig.collectedFilesPath, info.getName());
+        return Paths.get(appConfig.getCollectedFilesPath(), info.getName());
     }
 
     private void deleteLocalFile(FilePublishInformation info) {
@@ -231,6 +231,7 @@ public class CollectAndReportFiles {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        this.appConfig.addKafkaSecurityProps(props);
         return SenderOptions.create(props);
     }
 
@@ -270,8 +271,7 @@ public class CollectAndReportFiles {
      * polling/blocking fashion.
      */
     private Flux<FileReadyMessage> fetchFromKafka() {
-        KafkaTopicListener listener = new KafkaTopicListener(this.appConfig.getKafkaBootStrapServers(),
-            this.appConfig.kafkaClientId, this.appConfig.fileReadyEventTopic);
+        KafkaTopicListener listener = new KafkaTopicListener(this.appConfig);
         return listener.getFlux() //
             .flatMap(this::parseReceivedFileReadyMessage, 1);