- # Manually specify the fetch offset for a TopicPartition
- consumer.seek(topic_partition, starting_offset)
- # Get the last offset for the given partitions
- last_offset = consumer.end_offsets([topic_partition])[topic_partition]
- print('last_offset',last_offset)
-
- if (last_offset != prev_last_offset):
- for consumer_record in consumer:
- # Get req_id as msg_key and converts it from byte to str for each consumer record
- msg_key = byte_to_str(consumer_record.key)
- print ('msg_key in a consumer_record:', msg_key)
- if (req_id == msg_key):
- print ('req_id is found in consumer records', req_id)
- return consumer_record
- elif (consumer_record.offset == last_offset - 1):
- break
+ for consumer_record in consumer:
+ # Get req_id as msg_key and converts it from byte to str for each consumer record
+ msg_key = byte_to_str(consumer_record.key)
+ print ('msg_key in a consumer_record:', msg_key)
+ if (req_id == msg_key):
+ print ('req_id is found in consumer records', req_id)
+ return consumer_record