Test FTC100 fails since A1-SIM update
[nonrtric.git] / sample-services / ics-producer-consumer / consumer / src / main / java / com / demo / consumer / consumer / SimpleConsumer.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  *
5  * Copyright (C) 2024: OpenInfra Foundation Europe
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package com.demo.consumer.consumer;
22
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.ConsumerRecords;
25 import org.apache.kafka.clients.consumer.KafkaConsumer;
26 import org.apache.kafka.common.TopicPartition;
27 import org.apache.kafka.common.errors.WakeupException;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.beans.factory.annotation.Value;
31 import org.springframework.stereotype.Component;
32
33 import com.demo.consumer.messages.AbstractSimpleKafka;
34 import com.demo.consumer.messages.KafkaMessageHandler;
35 import com.demo.consumer.messages.MessageHelper;
36 import com.demo.consumer.messages.PropertiesHelper;
37
38 import lombok.Getter;
39 import lombok.Setter;
40
41 import java.util.Properties;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Optional;
45 import java.time.Duration;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 @Component
49 @Setter
50 @Getter
51 public class SimpleConsumer extends AbstractSimpleKafka {
52     private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
53
54     private KafkaConsumer<String, String> kafkaConsumer = null;
55     private final AtomicBoolean closed = new AtomicBoolean(false);
56
57     @Value("${vars.time:2000}")
58     private int TIME_OUT_MS;
59
60     public void run(String topicName, int numberOfRecords, KafkaMessageHandler callback) throws Exception {
61         Properties props = PropertiesHelper.getProperties();
62         // See if the number of records is provided
63         Optional<Integer> recs = Optional.ofNullable(numberOfRecords);
64
65         // adjust the number of records to get accordingly
66         Integer numOfRecs = recs.orElseGet(() -> Integer.parseInt(props.getProperty("max.poll.records")));
67         props.setProperty("max.poll.records", String.valueOf(numOfRecs));
68
69         // create the consumer
70         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
71
72         // make the consumer available for graceful shutdown
73         setKafkaConsumer(consumer);
74         consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
75         int recNum = numOfRecs;
76         while (recNum > 0) {
77             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
78             recNum = records.count();
79             if (recNum == 0) {
80                 log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
81                 break;
82             }
83
84             for (ConsumerRecord<String, String> record : records) {
85                 callback.processMessage(topicName, record);
86                 recNum--;
87             }
88         }
89         consumer.close();
90     }
91
92     private void close() throws Exception {
93         if (this.getKafkaConsumer() == null) {
94             log.info(MessageHelper.getSimpleJSONObject("The internal consumer is NULL").toJSONString());
95             return;
96         }
97         log.info(MessageHelper.getSimpleJSONObject("Closing consumer").toJSONString());
98         this.getKafkaConsumer().close();
99     }
100
101     public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
102         Properties props = PropertiesHelper.getProperties();
103         // make the consumer available for graceful shutdown
104         setKafkaConsumer(new KafkaConsumer<>(props));
105
106         // keep running forever or until shutdown() is called from another thread.
107         try {
108             getKafkaConsumer().subscribe(List.of(topicName));
109             while (!closed.get()) {
110                 ConsumerRecords<String, String> records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS));
111                 if (records.count() == 0) {
112                     log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
113                 }
114
115                 for (ConsumerRecord<String, String> record : records) {
116                     callback.processMessage(topicName, record);
117                     log.info(MessageHelper.getSimpleJSONObject("Topic: " + topicName + "Message: " + record.value()).toJSONString());
118                 }
119             }
120         } catch (WakeupException e) {
121             // Ignore exception if closing
122             if (!closed.get())
123                 throw e;
124         }
125     }
126
127     public void shutdown() {
128         closed.set(true);
129         try {
130             log.info(MessageHelper.getSimpleJSONObject("Shutting down consumer").toJSONString());
131         } catch (Exception e) {
132         }
133         getKafkaConsumer().wakeup();
134     }
135
136     public KafkaConsumer<String, String> getKafkaConsumer() {
137         return kafkaConsumer;
138     }
139
140     public void setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
141         this.kafkaConsumer = kafkaConsumer;
142     }
143 }