ICS sample producer and consumer
[nonrtric.git] / sample-services / ics-producer-consumer / consumer / src / main / java / com / demo / consumer / consumer / SimpleConsumer.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  *
5  * Copyright (C) 2024: OpenInfra Foundation Europe
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package com.demo.consumer.consumer;
22
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.ConsumerRecords;
25 import org.apache.kafka.clients.consumer.KafkaConsumer;
26 import org.apache.kafka.common.TopicPartition;
27 import org.apache.kafka.common.errors.WakeupException;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.beans.factory.annotation.Value;
31 import org.springframework.stereotype.Component;
32
33 import com.demo.consumer.messages.AbstractSimpleKafka;
34 import com.demo.consumer.messages.KafkaMessageHandler;
35 import com.demo.consumer.messages.MessageHelper;
36 import com.demo.consumer.messages.PropertiesHelper;
37
38 import lombok.Getter;
39 import lombok.Setter;
40
41 import java.util.Properties;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Optional;
45 import java.time.Duration;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 @Component
49 @Setter
50 @Getter
51 public class SimpleConsumer extends AbstractSimpleKafka {
52     private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
53
54     private KafkaConsumer<String, String> kafkaConsumer = null;
55     private final AtomicBoolean closed = new AtomicBoolean(false);
56
57     @Value("${vars.time:2000}")
58     private int TIME_OUT_MS;
59
60     public void run(String topicName, int numberOfRecords, KafkaMessageHandler callback) throws Exception {
61         Properties props = PropertiesHelper.getProperties();
62         // See if the number of records is provided
63         Optional<Integer> recs = Optional.ofNullable(numberOfRecords);
64
65         // adjust the number of records to get accordingly
66         Integer numOfRecs = recs.orElseGet(() -> Integer.parseInt(props.getProperty("max.poll.records")));
67         props.setProperty("max.poll.records", String.valueOf(numOfRecs));
68
69         // create the consumer
70         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
71
72         // make the consumer available for graceful shutdown
73         setKafkaConsumer(consumer);
74         consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
75         //consumer.seekToBeginning(consumer.assignment()); //--from-beginning
76         int recNum = numOfRecs;
77         while (recNum > 0) {
78             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
79             recNum = records.count();
80             if (recNum == 0) {
81                 log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
82                 break;
83             }
84
85             for (ConsumerRecord<String, String> record : records) {
86                 callback.processMessage(topicName, record);
87                 recNum--;
88             }
89         }
90         consumer.close();
91     }
92
93     private void close() throws Exception {
94         if (this.getKafkaConsumer() == null) {
95             log.info(MessageHelper.getSimpleJSONObject("The internal consumer is NULL").toJSONString());
96             return;
97         }
98         log.info(MessageHelper.getSimpleJSONObject("Closing consumer").toJSONString());
99         this.getKafkaConsumer().close();
100     }
101
102     public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
103         Properties props = PropertiesHelper.getProperties();
104         // make the consumer available for graceful shutdown
105         setKafkaConsumer(new KafkaConsumer<>(props));
106
107         // keep running forever or until shutdown() is called from another thread.
108         try {
109             getKafkaConsumer().subscribe(List.of(topicName));
110             while (!closed.get()) {
111                 ConsumerRecords<String, String> records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS));
112                 if (records.count() == 0) {
113                     log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
114                 }
115
116                 for (ConsumerRecord<String, String> record : records) {
117                     callback.processMessage(topicName, record);
118                     log.info(MessageHelper.getSimpleJSONObject("Topic: " + topicName + "Message: " + record.value()).toJSONString());
119                 }
120             }
121         } catch (WakeupException e) {
122             // Ignore exception if closing
123             if (!closed.get())
124                 throw e;
125         }
126     }
127
128     public void shutdown() {
129         closed.set(true);
130         try {
131             log.info(MessageHelper.getSimpleJSONObject("Shutting down consumer").toJSONString());
132         } catch (Exception e) {
133         }
134         getKafkaConsumer().wakeup();
135     }
136
137     public KafkaConsumer<String, String> getKafkaConsumer() {
138         return kafkaConsumer;
139     }
140
141     public void setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
142         this.kafkaConsumer = kafkaConsumer;
143     }
144 }