Update version number in container-tag for F Maintenance Release
[sim/a1-interface.git] / near-rt-ric-simulator / test / common / consume_events_from_kafka_bus.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 # This is a script for test-purposes only
19 # It consumes a response-event from a kafka bus with different apporaches
20 # In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
21 # To instal kafka-python please use: pip install kafka-python
22 # Example of an response-event json
23 #{
24   #"response-code": "400",
25   #"error-info": "Bad format"
26 #}
27
28
29 import os
30 import json
31 import sys
32 import math
33 import time
34
35 from kafka import KafkaConsumer, TopicPartition
36 from threading import RLock
37
38 # Response string with JSON format
39 response_data_JSON =  """
40 {
41   "response-code": 200,
42   "error-info": ""
43 }
44 """
45
46 # in seconds
47 TIME_OUT=30
48 target_topic_res='kafkatopicres'
49 MSG_BROKER_URL='localhost:9092'
50
51 # Instantiate KafkaConsumer with keyword arguments
52 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
53 def create_kafka_consumer():
54   consumer = KafkaConsumer(
55     # kafka cluster endpoint
56     bootstrap_servers = MSG_BROKER_URL,
57     # move to the earliest or latest available message
58     auto_offset_reset = 'earliest',
59     # number of milliseconds to block during message iteration
60     # if no new message available during this period of time, iteration through a for-loop will stop automatically
61     consumer_timeout_ms = 100,
62     value_deserializer = lambda m: json.loads(m.decode('ascii')),
63     #enable_auto_commit=False
64   )
65   return consumer
66
67 # Helper: Searches for req_id by seeking every five seconds up to thirty seconds
68 # Helper: If the req_id is found, then ConsumerRecord will be returned
69 # Helper: If the req_id is not found, then Response Request Timeout will be returned
70 def consume(req_id):
71
72   try:
73     print ('req_id looking for in consumer:', req_id)
74     consumer = create_kafka_consumer()
75     # Latch to target topic and partition
76     topic_partition = TopicPartition(target_topic_res, 0)
77     consumer.assign([topic_partition])
78
79     sleep_period_in_sec = 5
80     poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
81     poll_retries = 0
82
83     while (poll_retries < poll_cycle_threshold):
84       for consumer_record in consumer:
85         # Get req_id as msg_key and converts it from byte to str for each consumer record
86         msg_key = byte_to_str(consumer_record.key)
87         print ('msg_key in a consumer_record:', msg_key)
88         if (req_id == msg_key):
89           print ('req_id is found in consumer records', req_id)
90           return consumer_record
91
92       print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
93       time.sleep(sleep_period_in_sec)
94       poll_retries += 1
95
96     return 1
97   except Exception as err:
98     print('Error while consume record for req_id', err)
99     return 1
100   finally:
101     consumer.close()
102
103 # Helper: calculates poll cycle threshold
104 def calc_pollcycle_threshold(sleep_period_in_sec):
105
106     poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
107     poll_cycle_threshold = math.floor(poll_cycle_threshold)
108     return poll_cycle_threshold
109
110 # Helper: Converts a byte array to a str
111 def byte_to_str(byte_arr):
112
113   if (byte_arr is not None):
114     return byte_arr.decode('utf-8')
115   else:
116     return None
117
118 if __name__ == '__main__':
119     try:
120       requestid = sys.argv[1]
121       future = consume(requestid)
122     except Exception as err:
123       print('Error in __main__', err)
124       print (1)
125     sys.exit()