X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Ftest%2FKAFKA_DISPATCHER%2Fsrc%2Fmaincommon.py;fp=near-rt-ric-simulator%2Ftest%2FKAFKA_DISPATCHER%2Fsrc%2Fmaincommon.py;h=d5b65ae5ecece8a16cd6f14c82387a7ff2612b5b;hb=f87470c985836d38f7206f2e1e6298c6d010a8a7;hp=c534acbe201c0ab3dda17e2e0e090c642507976a;hpb=f0ec1252e0c98e4ee6989ea0a1e0eef9dd06c91b;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py index c534acb..d5b65ae 100644 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py @@ -65,9 +65,13 @@ def create_kafka_producer(): # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html def create_kafka_consumer(): consumer = KafkaConsumer( - #KAFKA_TOPIC_RES, + # kafka cluster endpoint bootstrap_servers = MSG_BROKER_URL, + # move to the earliest or latest available message auto_offset_reset = 'earliest', + # number of milliseconds to block during message iteration + # if no new message available during this period of time, iteration through a for-loop will stop automatically + consumer_timeout_ms = 100, value_deserializer = lambda m: json.loads(m.decode('ascii')), #enable_auto_commit=False )