+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# This is a script for test-purposes only
+# It consumes a response-event from a kafka bus with different apporaches
+# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
+# To instal kafka-python please use: pip install kafka-python
+# Example of an response-event json
+#{
+ #"response-code": "400",
+ #"error-info": "Bad format"
+#}
+
+
+import os
+import json
+import sys
+import math
+import time
+
+from kafka import KafkaConsumer, TopicPartition
+from threading import RLock
+
+# Response string with JSON format
+response_data_JSON = """
+{
+ "response-code": 200,
+ "error-info": ""
+}
+"""
+
+# in seconds
+TIME_OUT=30
+target_topic_res='kafkatopicres'
+MSG_BROKER_URL='localhost:9092'
+
+# Instantiate KafkaConsumer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
+def create_kafka_consumer():
+ consumer = KafkaConsumer(
+ # kafka cluster endpoint
+ bootstrap_servers = MSG_BROKER_URL,
+ # move to the earliest or latest available message
+ auto_offset_reset = 'earliest',
+ # number of milliseconds to block during message iteration
+ # if no new message available during this period of time, iteration through a for-loop will stop automatically
+ consumer_timeout_ms = 100,
+ value_deserializer = lambda m: json.loads(m.decode('ascii')),
+ #enable_auto_commit=False
+ )
+ return consumer
+
+# Helper: Searches for req_id by seeking every five seconds up to thirty seconds
+# Helper: If the req_id is found, then ConsumerRecord will be returned
+# Helper: If the req_id is not found, then Response Request Timeout will be returned
+def consume(req_id):
+
+ try:
+ print ('req_id looking for in consumer:', req_id)
+ consumer = create_kafka_consumer()
+ # Latch to target topic and partition
+ topic_partition = TopicPartition(target_topic_res, 0)
+ consumer.assign([topic_partition])
+
+ sleep_period_in_sec = 5
+ poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
+ poll_retries = 0
+
+ while (poll_retries < poll_cycle_threshold):
+ for consumer_record in consumer:
+ # Get req_id as msg_key and converts it from byte to str for each consumer record
+ msg_key = byte_to_str(consumer_record.key)
+ print ('msg_key in a consumer_record:', msg_key)
+ if (req_id == msg_key):
+ print ('req_id is found in consumer records', req_id)
+ return consumer_record
+
+ print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
+ time.sleep(sleep_period_in_sec)
+ poll_retries += 1
+
+ return 1
+ except Exception as err:
+ print('Error while consume record for req_id', err)
+ return 1
+ finally:
+ consumer.close()
+
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+ poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+ poll_cycle_threshold = math.floor(poll_cycle_threshold)
+ return poll_cycle_threshold
+
+# Helper: Converts a byte array to a str
+def byte_to_str(byte_arr):
+
+ if (byte_arr is not None):
+ return byte_arr.decode('utf-8')
+ else:
+ return None
+
+if __name__ == '__main__':
+ try:
+ requestid = sys.argv[1]
+ future = consume(requestid)
+ except Exception as err:
+ print('Error in __main__', err)
+ print (1)
+ sys.exit()