2 * ========================LICENSE_START=================================
5 * Copyright (C) 2024: OpenInfra Foundation Europe
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package com.demo.consumer.consumer;
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.ConsumerRecords;
25 import org.apache.kafka.clients.consumer.KafkaConsumer;
26 import org.apache.kafka.common.TopicPartition;
27 import org.apache.kafka.common.errors.WakeupException;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.beans.factory.annotation.Value;
31 import org.springframework.stereotype.Component;
33 import com.demo.consumer.messages.AbstractSimpleKafka;
34 import com.demo.consumer.messages.KafkaMessageHandler;
35 import com.demo.consumer.messages.MessageHelper;
36 import com.demo.consumer.messages.PropertiesHelper;
41 import java.util.Properties;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Optional;
45 import java.time.Duration;
46 import java.util.concurrent.atomic.AtomicBoolean;
51 public class SimpleConsumer extends AbstractSimpleKafka {
52 private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
54 private KafkaConsumer<String, String> kafkaConsumer = null;
55 private final AtomicBoolean closed = new AtomicBoolean(false);
57 @Value("${vars.time:2000}")
58 private int TIME_OUT_MS;
60 public void run(String topicName, int numberOfRecords, KafkaMessageHandler callback) throws Exception {
61 Properties props = PropertiesHelper.getProperties();
62 // See if the number of records is provided
63 Optional<Integer> recs = Optional.ofNullable(numberOfRecords);
65 // adjust the number of records to get accordingly
66 Integer numOfRecs = recs.orElseGet(() -> Integer.parseInt(props.getProperty("max.poll.records")));
67 props.setProperty("max.poll.records", String.valueOf(numOfRecs));
69 // create the consumer
70 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
72 // make the consumer available for graceful shutdown
73 setKafkaConsumer(consumer);
74 consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
75 int recNum = numOfRecs;
77 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
78 recNum = records.count();
80 log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
84 for (ConsumerRecord<String, String> record : records) {
85 callback.processMessage(topicName, record);
92 private void close() throws Exception {
93 if (this.getKafkaConsumer() == null) {
94 log.info(MessageHelper.getSimpleJSONObject("The internal consumer is NULL").toJSONString());
97 log.info(MessageHelper.getSimpleJSONObject("Closing consumer").toJSONString());
98 this.getKafkaConsumer().close();
101 public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
102 Properties props = PropertiesHelper.getProperties();
103 // make the consumer available for graceful shutdown
104 setKafkaConsumer(new KafkaConsumer<>(props));
106 // keep running forever or until shutdown() is called from another thread.
108 getKafkaConsumer().subscribe(List.of(topicName));
109 while (!closed.get()) {
110 ConsumerRecords<String, String> records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS));
111 if (records.count() == 0) {
112 log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
115 for (ConsumerRecord<String, String> record : records) {
116 callback.processMessage(topicName, record);
117 log.info(MessageHelper.getSimpleJSONObject("Topic: " + topicName + "Message: " + record.value()).toJSONString());
120 } catch (WakeupException e) {
121 // Ignore exception if closing
127 public void shutdown() {
130 log.info(MessageHelper.getSimpleJSONObject("Shutting down consumer").toJSONString());
131 } catch (Exception e) {
133 getKafkaConsumer().wakeup();
136 public KafkaConsumer<String, String> getKafkaConsumer() {
137 return kafkaConsumer;
140 public void setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
141 this.kafkaConsumer = kafkaConsumer;