Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / tasks / KafkaTopicListener.java
@@ -18,7 +18,7 @@
  * ========================LICENSE_END===================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +28,7 @@ import lombok.ToString;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.datafile.configuration.AppConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,15 +56,11 @@ public class KafkaTopicListener {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
 
-    private final String inputTopic;
-    private final String kafkaBoostrapServers;
-    private final String kafkaClientId;
     private Flux<DataFromTopic> dataFromTopic;
+    private final AppConfig appConfig;
 
-    public KafkaTopicListener(String kafkaBoostrapServers, String clientId, String topic) {
-        this.kafkaClientId = clientId;
-        this.kafkaBoostrapServers = kafkaBoostrapServers;
-        this.inputTopic = topic;
+    public KafkaTopicListener(AppConfig applConfig) {
+        this.appConfig = applConfig;
     }
 
     public Flux<DataFromTopic> getFlux() {
@@ -74,10 +71,12 @@ public class KafkaTopicListener {
     }
 
     private Flux<DataFromTopic> startReceiveFromTopic() {
-        logger.debug("Listening to kafka topic: {}, client id: {}", this.inputTopic, this.kafkaClientId);
+        logger.debug("Listening to kafka topic: {}, client id: {}", appConfig.getInputTopic(),
+            appConfig.getKafkaClientId());
         return KafkaReceiver.create(kafkaInputProperties()) //
             .receive() //
-            .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.inputTopic, input.value())) //
+            .doOnNext(
+                input -> logger.debug("Received from kafka topic: {} :{}", appConfig.getInputTopic(), input.value())) //
             .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
             .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
             .doFinally(sig -> this.dataFromTopic = null) //
@@ -89,18 +88,19 @@ public class KafkaTopicListener {
 
     private ReceiverOptions<String, String> kafkaInputProperties() {
         Map<String, Object> consumerProps = new HashMap<>();
-        if (this.kafkaBoostrapServers.isEmpty()) {
+        if (appConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaBoostrapServers);
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + inputTopic);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootStrapServers());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + appConfig.getInputTopic());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.kafkaClientId);
+        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, appConfig.getKafkaClientId());
+        this.appConfig.addKafkaSecurityProps(consumerProps);
 
         return ReceiverOptions.<String, String>create(consumerProps)
-            .subscription(Collections.singleton(this.inputTopic));
+            .subscription(Collections.singleton(appConfig.getInputTopic()));
     }
 
 }