MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
TIME_OUT=os.getenv('TIME_OUT')
+publishresponse=os.getenv('PUBLISH_RESP')
#Constsants
APPL_JSON='application/json'
print('Future:', record_metadata)
publish_time_in_ms = record_metadata.timestamp
- # For test purposes only, publish the success response event with no error-info to response topic
- # if basic_test_with_cust_header.sh is being used, then comment this line
- # else comment out this line for the basic_test.sh
- kafka_response_event = create_kafka_response_event(200, "")
- producer.send(target_topic_res, kafka_response_event, req_id)
+ # For test purposes only triggered from A1 sim
+ # Publish the success response event with no error-info to response topic
+ # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test
+ if (publishresponse is not None and req_id_from_header is None):
+ kafka_response_event = create_kafka_response_event(200, "")
+ producer.send(target_topic_res, kafka_response_event, req_id)
# synch-consume
consumer_record = consume_record_for(req_id, target_topic_res)
print("Consumer Record:", consumer_record)
cons_rec_value = consumer_record.value
- cons_rec_val_in_dict = json.loads(cons_rec_value)
- resp_code = cons_rec_val_in_dict['response-code']
+ print('Class for cons_rec_value:', cons_rec_value.__class__)
+ if isinstance(cons_rec_value, str):
+ print('cons_rec_value isinstance str')
+ cons_rec_val_in_dict = json.loads(cons_rec_value) # json.loads: converts str to dict
+ resp_code = cons_rec_val_in_dict['response-code']
+ elif isinstance(cons_rec_value, dict):
+ print('cons_rec_value isinstance dict')
+ resp_code = cons_rec_value['response-code']
+ else:
+ print('cons_rec_value isinstance other')
+ cons_rec_val_in_dict = json.loads(cons_rec_value)
+ resp_code = cons_rec_val_in_dict['response-code']
# if response code success, then check for time-out
if (int(resp_code) == 200):
def consume_record_for(req_id, target_topic_res):
try:
- print ('req_id looking for in consumer:', req_id)
+ print ('req_id looking for in consumer: ' + target_topic_res, req_id)
consumer = create_kafka_consumer()
topic_partition = TopicPartition(target_topic_res, 0)
consumer.assign([topic_partition])
- # calculates poll cycle threshold
sleep_period_in_sec = 5
- poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
- poll_cycle_threshold = math.floor(poll_cycle_threshold)
- print('poll_cycle_threshold', poll_cycle_threshold)
-
+ poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
poll_retries = 0
- starting_offset = 0
- prev_last_offset = 0
+
while (poll_retries < poll_cycle_threshold):
- # Manually specify the fetch offset for a TopicPartition
- consumer.seek(topic_partition, starting_offset)
- # Get the last offset for the given partitions
- last_offset = consumer.end_offsets([topic_partition])[topic_partition]
- print('last_offset',last_offset)
-
- if (last_offset != prev_last_offset):
- for consumer_record in consumer:
- # Get req_id as msg_key and converts it from byte to str for each consumer record
- msg_key = byte_to_str(consumer_record.key)
- print ('msg_key in a consumer_record:', msg_key)
- if (req_id == msg_key):
- print ('req_id is found in consumer records', req_id)
- return consumer_record
- elif (consumer_record.offset == last_offset - 1):
- break
+ for consumer_record in consumer:
+ # Get req_id as msg_key and converts it from byte to str for each consumer record
+ msg_key = byte_to_str(consumer_record.key)
+ print ('msg_key in a consumer_record:', msg_key)
+ if (req_id == msg_key):
+ print ('req_id is found in consumer records', req_id)
+ return consumer_record
print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
time.sleep(sleep_period_in_sec)
-
poll_retries += 1
- prev_last_offset = last_offset
- starting_offset += last_offset
# Returns time-out response
pjson=create_error_response(408)
finally:
consumer.close()
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+ poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+ poll_cycle_threshold = math.floor(poll_cycle_threshold)
+ return poll_cycle_threshold
# Helper: Create a response object if forced http response code is set
def get_forced_response():