X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Ftest%2Fcommon%2Fconsume_events_from_kafka_bus.py;fp=near-rt-ric-simulator%2Ftest%2Fcommon%2Fconsume_events_from_kafka_bus.py;h=f7dfb6560b53108f2b044f2d8d5063a4ad221e1b;hb=f87470c985836d38f7206f2e1e6298c6d010a8a7;hp=0000000000000000000000000000000000000000;hpb=f0ec1252e0c98e4ee6989ea0a1e0eef9dd06c91b;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py b/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py new file mode 100644 index 0000000..f7dfb65 --- /dev/null +++ b/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py @@ -0,0 +1,125 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# This is a script for test-purposes only +# It consumes a response-event from a kafka bus with different apporaches +# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed +# To instal kafka-python please use: pip install kafka-python +# Example of an response-event json +#{ + #"response-code": "400", + #"error-info": "Bad format" +#} + + +import os +import json +import sys +import math +import time + +from kafka import KafkaConsumer, TopicPartition +from threading import RLock + +# Response string with JSON format +response_data_JSON = """ +{ + "response-code": 200, + "error-info": "" +} +""" + +# in seconds +TIME_OUT=30 +target_topic_res='kafkatopicres' +MSG_BROKER_URL='localhost:9092' + +# Instantiate KafkaConsumer with keyword arguments +# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html +def create_kafka_consumer(): + consumer = KafkaConsumer( + # kafka cluster endpoint + bootstrap_servers = MSG_BROKER_URL, + # move to the earliest or latest available message + auto_offset_reset = 'earliest', + # number of milliseconds to block during message iteration + # if no new message available during this period of time, iteration through a for-loop will stop automatically + consumer_timeout_ms = 100, + value_deserializer = lambda m: json.loads(m.decode('ascii')), + #enable_auto_commit=False + ) + return consumer + +# Helper: Searches for req_id by seeking every five seconds up to thirty seconds +# Helper: If the req_id is found, then ConsumerRecord will be returned +# Helper: If the req_id is not found, then Response Request Timeout will be returned +def consume(req_id): + + try: + print ('req_id looking for in consumer:', req_id) + consumer = create_kafka_consumer() + # Latch to target topic and partition + topic_partition = TopicPartition(target_topic_res, 0) + consumer.assign([topic_partition]) + + sleep_period_in_sec = 5 + poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec) + poll_retries = 0 + + while (poll_retries < poll_cycle_threshold): + for consumer_record in consumer: + # Get req_id as msg_key and converts it from byte to str for each consumer record + msg_key = byte_to_str(consumer_record.key) + print ('msg_key in a consumer_record:', msg_key) + if (req_id == msg_key): + print ('req_id is found in consumer records', req_id) + return consumer_record + + print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...') + time.sleep(sleep_period_in_sec) + poll_retries += 1 + + return 1 + except Exception as err: + print('Error while consume record for req_id', err) + return 1 + finally: + consumer.close() + +# Helper: calculates poll cycle threshold +def calc_pollcycle_threshold(sleep_period_in_sec): + + poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec + poll_cycle_threshold = math.floor(poll_cycle_threshold) + return poll_cycle_threshold + +# Helper: Converts a byte array to a str +def byte_to_str(byte_arr): + + if (byte_arr is not None): + return byte_arr.decode('utf-8') + else: + return None + +if __name__ == '__main__': + try: + requestid = sys.argv[1] + future = consume(requestid) + except Exception as err: + print('Error in __main__', err) + print (1) + sys.exit()