private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private Flux<DataFromKafkaTopic> dataFromTopic;
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
public KafkaTopicListener(ApplicationConfig applConfig) {
this.applicationConfig = applConfig;
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + applicationConfig.getKafkaGroupId());
+ this.applicationConfig.addKafkaSecurityProps(consumerProps);
return ReceiverOptions.<byte[], byte[]>create(consumerProps)
.subscription(Collections.singleton(this.applicationConfig.getKafkaInputTopic()));