* ========================LICENSE_END===================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.datafile.configuration.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
- private final String inputTopic;
- private final String kafkaBoostrapServers;
- private final String kafkaClientId;
private Flux<DataFromTopic> dataFromTopic;
+ private final AppConfig appConfig;
- public KafkaTopicListener(String kafkaBoostrapServers, String clientId, String topic) {
- this.kafkaClientId = clientId;
- this.kafkaBoostrapServers = kafkaBoostrapServers;
- this.inputTopic = topic;
+ public KafkaTopicListener(AppConfig applConfig) {
+ this.appConfig = applConfig;
}
public Flux<DataFromTopic> getFlux() {
}
private Flux<DataFromTopic> startReceiveFromTopic() {
- logger.debug("Listening to kafka topic: {}, client id: {}", this.inputTopic, this.kafkaClientId);
+ logger.debug("Listening to kafka topic: {}, client id: {}", appConfig.getInputTopic(),
+ appConfig.getKafkaClientId());
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
- .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.inputTopic, input.value())) //
+ .doOnNext(
+ input -> logger.debug("Received from kafka topic: {} :{}", appConfig.getInputTopic(), input.value())) //
.doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
.doFinally(sig -> this.dataFromTopic = null) //
private ReceiverOptions<String, String> kafkaInputProperties() {
Map<String, Object> consumerProps = new HashMap<>();
- if (this.kafkaBoostrapServers.isEmpty()) {
+ if (appConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaBoostrapServers);
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + inputTopic);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootStrapServers());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + appConfig.getInputTopic());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.kafkaClientId);
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, appConfig.getKafkaClientId());
+ this.appConfig.addKafkaSecurityProps(consumerProps);
return ReceiverOptions.<String, String>create(consumerProps)
- .subscription(Collections.singleton(this.inputTopic));
+ .subscription(Collections.singleton(appConfig.getInputTopic()));
}
}