Consumer algorithm improvment via autoblock during message iteration
[sim/a1-interface.git] / near-rt-ric-simulator / test / KAFKA_DISPATCHER / src / dispatcher.py
index 19e2ab6..08a4eed 100644 (file)
@@ -31,6 +31,7 @@ from maincommon import create_kafka_producer, create_kafka_consumer, create_kafk
 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
 
 TIME_OUT=os.getenv('TIME_OUT')
+publishresponse=os.getenv('PUBLISH_RESP')
 
 #Constsants
 APPL_JSON='application/json'
@@ -160,11 +161,12 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
     print('Future:', record_metadata)
     publish_time_in_ms = record_metadata.timestamp
 
-    # For test purposes only, publish the success response event with no error-info to response topic
-    # if basic_test_with_cust_header.sh is being used, then comment this line
-    # else comment out this line for the basic_test.sh
-    kafka_response_event = create_kafka_response_event(200, "")
-    producer.send(target_topic_res, kafka_response_event, req_id)
+    # For test purposes only triggered from A1 sim
+    # Publish the success response event with no error-info to response topic
+    # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test
+    if (publishresponse is not None and req_id_from_header is None):
+      kafka_response_event = create_kafka_response_event(200, "")
+      producer.send(target_topic_res, kafka_response_event, req_id)
 
     # synch-consume
     consumer_record = consume_record_for(req_id, target_topic_res)
@@ -214,44 +216,27 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
 def consume_record_for(req_id, target_topic_res):
 
   try:
-    print ('req_id looking for in consumer:', req_id)
+    print ('req_id looking for in consumer: ' + target_topic_res, req_id)
     consumer = create_kafka_consumer()
     topic_partition = TopicPartition(target_topic_res, 0)
     consumer.assign([topic_partition])
 
-    # calculates poll cycle threshold
     sleep_period_in_sec = 5
-    poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
-    poll_cycle_threshold = math.floor(poll_cycle_threshold)
-    print('poll_cycle_threshold', poll_cycle_threshold)
-
+    poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
     poll_retries = 0
-    starting_offset = 0
-    prev_last_offset = 0
+
     while (poll_retries < poll_cycle_threshold):
-      # Manually specify the fetch offset for a TopicPartition
-      consumer.seek(topic_partition, starting_offset)
-      # Get the last offset for the given partitions
-      last_offset = consumer.end_offsets([topic_partition])[topic_partition]
-      print('last_offset',last_offset)
-
-      if (last_offset != prev_last_offset):
-        for consumer_record in consumer:
-          # Get req_id as msg_key and converts it from byte to str for each consumer record
-          msg_key = byte_to_str(consumer_record.key)
-          print ('msg_key in a consumer_record:', msg_key)
-          if (req_id == msg_key):
-            print ('req_id is found in consumer records', req_id)
-            return consumer_record
-          elif (consumer_record.offset == last_offset - 1):
-            break
+      for consumer_record in consumer:
+        # Get req_id as msg_key and converts it from byte to str for each consumer record
+        msg_key = byte_to_str(consumer_record.key)
+        print ('msg_key in a consumer_record:', msg_key)
+        if (req_id == msg_key):
+          print ('req_id is found in consumer records', req_id)
+          return consumer_record
 
       print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
       time.sleep(sleep_period_in_sec)
-
       poll_retries += 1
-      prev_last_offset = last_offset
-      starting_offset += last_offset
 
     # Returns time-out response
     pjson=create_error_response(408)
@@ -264,6 +249,12 @@ def consume_record_for(req_id, target_topic_res):
   finally:
     consumer.close()
 
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+    poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+    poll_cycle_threshold = math.floor(poll_cycle_threshold)
+    return poll_cycle_threshold
 
 # Helper: Create a response object if forced http response code is set
 def get_forced_response():