X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Ftest%2FKAFKA_DISPATCHER%2Fsrc%2Fdispatcher.py;h=08a4eedf94dd7271d825502a8d9fa03767580862;hb=f87470c985836d38f7206f2e1e6298c6d010a8a7;hp=19e2ab65763d7541d94c22b9fcee9c90619d2e2a;hpb=f0ec1252e0c98e4ee6989ea0a1e0eef9dd06c91b;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py index 19e2ab6..08a4eed 100644 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py @@ -31,6 +31,7 @@ from maincommon import create_kafka_producer, create_kafka_consumer, create_kafk MSG_BROKER_URL=os.getenv('MSG_BROKER_URL') TIME_OUT=os.getenv('TIME_OUT') +publishresponse=os.getenv('PUBLISH_RESP') #Constsants APPL_JSON='application/json' @@ -160,11 +161,12 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id): print('Future:', record_metadata) publish_time_in_ms = record_metadata.timestamp - # For test purposes only, publish the success response event with no error-info to response topic - # if basic_test_with_cust_header.sh is being used, then comment this line - # else comment out this line for the basic_test.sh - kafka_response_event = create_kafka_response_event(200, "") - producer.send(target_topic_res, kafka_response_event, req_id) + # For test purposes only triggered from A1 sim + # Publish the success response event with no error-info to response topic + # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test + if (publishresponse is not None and req_id_from_header is None): + kafka_response_event = create_kafka_response_event(200, "") + producer.send(target_topic_res, kafka_response_event, req_id) # synch-consume consumer_record = consume_record_for(req_id, target_topic_res) @@ -214,44 +216,27 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id): def consume_record_for(req_id, target_topic_res): try: - print ('req_id looking for in consumer:', req_id) + print ('req_id looking for in consumer: ' + target_topic_res, req_id) consumer = create_kafka_consumer() topic_partition = TopicPartition(target_topic_res, 0) consumer.assign([topic_partition]) - # calculates poll cycle threshold sleep_period_in_sec = 5 - poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec - poll_cycle_threshold = math.floor(poll_cycle_threshold) - print('poll_cycle_threshold', poll_cycle_threshold) - + poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec) poll_retries = 0 - starting_offset = 0 - prev_last_offset = 0 + while (poll_retries < poll_cycle_threshold): - # Manually specify the fetch offset for a TopicPartition - consumer.seek(topic_partition, starting_offset) - # Get the last offset for the given partitions - last_offset = consumer.end_offsets([topic_partition])[topic_partition] - print('last_offset',last_offset) - - if (last_offset != prev_last_offset): - for consumer_record in consumer: - # Get req_id as msg_key and converts it from byte to str for each consumer record - msg_key = byte_to_str(consumer_record.key) - print ('msg_key in a consumer_record:', msg_key) - if (req_id == msg_key): - print ('req_id is found in consumer records', req_id) - return consumer_record - elif (consumer_record.offset == last_offset - 1): - break + for consumer_record in consumer: + # Get req_id as msg_key and converts it from byte to str for each consumer record + msg_key = byte_to_str(consumer_record.key) + print ('msg_key in a consumer_record:', msg_key) + if (req_id == msg_key): + print ('req_id is found in consumer records', req_id) + return consumer_record print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...') time.sleep(sleep_period_in_sec) - poll_retries += 1 - prev_last_offset = last_offset - starting_offset += last_offset # Returns time-out response pjson=create_error_response(408) @@ -264,6 +249,12 @@ def consume_record_for(req_id, target_topic_res): finally: consumer.close() +# Helper: calculates poll cycle threshold +def calc_pollcycle_threshold(sleep_period_in_sec): + + poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec + poll_cycle_threshold = math.floor(poll_cycle_threshold) + return poll_cycle_threshold # Helper: Create a response object if forced http response code is set def get_forced_response():