1 # ============LICENSE_START===============================================
2 # Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
18 # This is a script for test-purposes only
19 # It consumes a response-event from a kafka bus with different apporaches
20 # In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
21 # To instal kafka-python please use: pip install kafka-python
22 # Example of an response-event json
24 #"response-code": "400",
25 #"error-info": "Bad format"
35 from kafka import KafkaConsumer, TopicPartition
36 from threading import RLock
38 # Response string with JSON format
39 response_data_JSON = """
48 target_topic_res='kafkatopicres'
49 MSG_BROKER_URL='localhost:9092'
51 # Instantiate KafkaConsumer with keyword arguments
52 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
53 def create_kafka_consumer():
54 consumer = KafkaConsumer(
55 # kafka cluster endpoint
56 bootstrap_servers = MSG_BROKER_URL,
57 # move to the earliest or latest available message
58 auto_offset_reset = 'earliest',
59 # number of milliseconds to block during message iteration
60 # if no new message available during this period of time, iteration through a for-loop will stop automatically
61 consumer_timeout_ms = 100,
62 value_deserializer = lambda m: json.loads(m.decode('ascii')),
63 #enable_auto_commit=False
67 # Helper: Searches for req_id by seeking every five seconds up to thirty seconds
68 # Helper: If the req_id is found, then ConsumerRecord will be returned
69 # Helper: If the req_id is not found, then Response Request Timeout will be returned
73 print ('req_id looking for in consumer:', req_id)
74 consumer = create_kafka_consumer()
75 # Latch to target topic and partition
76 topic_partition = TopicPartition(target_topic_res, 0)
77 consumer.assign([topic_partition])
79 sleep_period_in_sec = 5
80 poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
83 while (poll_retries < poll_cycle_threshold):
84 for consumer_record in consumer:
85 # Get req_id as msg_key and converts it from byte to str for each consumer record
86 msg_key = byte_to_str(consumer_record.key)
87 print ('msg_key in a consumer_record:', msg_key)
88 if (req_id == msg_key):
89 print ('req_id is found in consumer records', req_id)
90 return consumer_record
92 print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
93 time.sleep(sleep_period_in_sec)
97 except Exception as err:
98 print('Error while consume record for req_id', err)
103 # Helper: calculates poll cycle threshold
104 def calc_pollcycle_threshold(sleep_period_in_sec):
106 poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
107 poll_cycle_threshold = math.floor(poll_cycle_threshold)
108 return poll_cycle_threshold
110 # Helper: Converts a byte array to a str
111 def byte_to_str(byte_arr):
113 if (byte_arr is not None):
114 return byte_arr.decode('utf-8')
118 if __name__ == '__main__':
120 requestid = sys.argv[1]
121 future = consume(requestid)
122 except Exception as err:
123 print('Error in __main__', err)