From: halil.cakal Date: Fri, 5 Aug 2022 07:49:13 +0000 (+0100) Subject: Consumer algorithm improvment via autoblock during message iteration X-Git-Tag: 2.4.0~4 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=sim%2Fa1-interface.git;a=commitdiff_plain;h=f87470c985836d38f7206f2e1e6298c6d010a8a7 Consumer algorithm improvment via autoblock during message iteration Patch Set 2: Enriches test cases using more than one response topic and develops flag-controlled publish to response-topic feature. Issue-ID: NONRTRIC-757 Change-Id: I99a50a44a2862787fda8d52e645ad01194af6a0b Signed-off-by: halil.cakal --- diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json index 254cb4d..9c5cb9b 100644 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json @@ -5,10 +5,10 @@ }, "STD_1": { "request_topic": "kafkatopicreq2", - "response_topic": "kafkatopicres" + "response_topic": "kafkatopicres2" }, "STD_2": { "request_topic": "kafkatopicreq3", - "response_topic": "kafkatopicres" + "response_topic": "kafkatopicres3" } } diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py index 19e2ab6..08a4eed 100644 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py @@ -31,6 +31,7 @@ from maincommon import create_kafka_producer, create_kafka_consumer, create_kafk MSG_BROKER_URL=os.getenv('MSG_BROKER_URL') TIME_OUT=os.getenv('TIME_OUT') +publishresponse=os.getenv('PUBLISH_RESP') #Constsants APPL_JSON='application/json' @@ -160,11 +161,12 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id): print('Future:', record_metadata) publish_time_in_ms = record_metadata.timestamp - # For test purposes only, publish the success response event with no error-info to response topic - # if basic_test_with_cust_header.sh is being used, then comment this line - # else comment out this line for the basic_test.sh - kafka_response_event = create_kafka_response_event(200, "") - producer.send(target_topic_res, kafka_response_event, req_id) + # For test purposes only triggered from A1 sim + # Publish the success response event with no error-info to response topic + # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test + if (publishresponse is not None and req_id_from_header is None): + kafka_response_event = create_kafka_response_event(200, "") + producer.send(target_topic_res, kafka_response_event, req_id) # synch-consume consumer_record = consume_record_for(req_id, target_topic_res) @@ -214,44 +216,27 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id): def consume_record_for(req_id, target_topic_res): try: - print ('req_id looking for in consumer:', req_id) + print ('req_id looking for in consumer: ' + target_topic_res, req_id) consumer = create_kafka_consumer() topic_partition = TopicPartition(target_topic_res, 0) consumer.assign([topic_partition]) - # calculates poll cycle threshold sleep_period_in_sec = 5 - poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec - poll_cycle_threshold = math.floor(poll_cycle_threshold) - print('poll_cycle_threshold', poll_cycle_threshold) - + poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec) poll_retries = 0 - starting_offset = 0 - prev_last_offset = 0 + while (poll_retries < poll_cycle_threshold): - # Manually specify the fetch offset for a TopicPartition - consumer.seek(topic_partition, starting_offset) - # Get the last offset for the given partitions - last_offset = consumer.end_offsets([topic_partition])[topic_partition] - print('last_offset',last_offset) - - if (last_offset != prev_last_offset): - for consumer_record in consumer: - # Get req_id as msg_key and converts it from byte to str for each consumer record - msg_key = byte_to_str(consumer_record.key) - print ('msg_key in a consumer_record:', msg_key) - if (req_id == msg_key): - print ('req_id is found in consumer records', req_id) - return consumer_record - elif (consumer_record.offset == last_offset - 1): - break + for consumer_record in consumer: + # Get req_id as msg_key and converts it from byte to str for each consumer record + msg_key = byte_to_str(consumer_record.key) + print ('msg_key in a consumer_record:', msg_key) + if (req_id == msg_key): + print ('req_id is found in consumer records', req_id) + return consumer_record print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...') time.sleep(sleep_period_in_sec) - poll_retries += 1 - prev_last_offset = last_offset - starting_offset += last_offset # Returns time-out response pjson=create_error_response(408) @@ -264,6 +249,12 @@ def consume_record_for(req_id, target_topic_res): finally: consumer.close() +# Helper: calculates poll cycle threshold +def calc_pollcycle_threshold(sleep_period_in_sec): + + poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec + poll_cycle_threshold = math.floor(poll_cycle_threshold) + return poll_cycle_threshold # Helper: Create a response object if forced http response code is set def get_forced_response(): diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py index c534acb..d5b65ae 100644 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py @@ -65,9 +65,13 @@ def create_kafka_producer(): # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html def create_kafka_consumer(): consumer = KafkaConsumer( - #KAFKA_TOPIC_RES, + # kafka cluster endpoint bootstrap_servers = MSG_BROKER_URL, + # move to the earliest or latest available message auto_offset_reset = 'earliest', + # number of milliseconds to block during message iteration + # if no new message available during this period of time, iteration through a for-loop will stop automatically + consumer_timeout_ms = 100, value_deserializer = lambda m: json.loads(m.decode('ascii')), #enable_auto_commit=False ) diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh index ef7014d..62c2fa5 100755 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh @@ -61,65 +61,105 @@ RESULT="json:$res" do_curl GET /policytypetotopicmapping/ANR 200 echo "=== Put policy: shall publish and consume for put policy operation ===" +req_id=$(get_random_number) RESULT="" -do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json +do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres +wait $proc_id echo "=== Get policy status: shall publish and consume for get policy status operation ===" +req_id=$(get_random_number) RESULT="" -do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres +wait $proc_id echo "=== Put policy: shall publish and consume for put policy operation for alpha ===" +req_id=$(get_random_number) RESULT="" -do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json +do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres2 +wait $proc_id echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ===" +req_id=$(get_random_number) RESULT="" -do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 +do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres2 +wait $proc_id echo "=== Set force delay 5 sec ===" RESULT="Force delay: 5 sec set for all dispatcher responses until it is resetted" do_curl POST '/dispatcheradmin/forcedelay?delay=5' 200 -echo "=== Put policy: shall wait at least sec and then respond while publishing and consuming ===" -RESULT="" -do_elapsetime_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json 5 +echo "=== Hello world: shall wait at least sec and then respond while hello world ===" +RESULT="OK" +do_elapsetime_curl GET / 200 jsonfiles/alpha_policy.json 5 echo "=== Reset force delay ===" RESULT="Force delay has been resetted for all dispatcher responses" do_curl POST /dispatcheradmin/forcedelay 200 echo "=== Put policy: shall publish and consume for put policy operation for beta ===" +req_id=$(get_random_number) RESULT="" -do_curl PUT /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json +do_curl PUT /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres2 +wait $proc_id echo "=== Get policy status: shall publish and consume for get policy status operation ===" +req_id=$(get_random_number) RESULT="" -do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres +wait $proc_id echo "=== Put policy: shall publish and consume for put policy operation for alpha ===" +req_id=$(get_random_number) RESULT="" -do_curl PUT /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json +do_curl PUT /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres3 +wait $proc_id echo "=== Set force response code: 500 ===" RESULT="Force response code: 500 set for all dispatcher response until it is resetted" do_curl POST '/dispatcheradmin/forceresponse?code=500' 200 echo "=== Put policy: shall not publish and consume for put policy operation for alpha ===" +req_id=$(get_random_number) res=$(cat jsonfiles/forced_response.json) RESULT="json:$res" -do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json +do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres +wait $proc_id echo "=== Reset force response code ===" RESULT="Force response code has been resetted for dispatcher responses" do_curl POST /dispatcheradmin/forceresponse 200 echo "=== Get policy status: shall publish and consume for get policy status operation ===" +req_id=$(get_random_number) RESULT="" -do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres +wait $proc_id echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ===" +req_id=$(get_random_number) RESULT="" -do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 +do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id kafkatopicres2 +wait $proc_id echo "********************" echo "*** All tests ok ***" diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh index dff351e..f0869ee 100755 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh @@ -17,15 +17,23 @@ # ============LICENSE_END================================================= # -# Script to build and start the container -# Make sure to run the simulator with the same arg as this script +# Script to build and start the kafka dispatcher container +# Make sure to run the simulator including args as is this script print_usage() { - echo "Usage: ./build_and_start.sh " + echo "Usage: ./build_and_start.sh publish-resp|ignore-publish" exit 1 } -if [ $# -ge 1 ]; then +if [ $# -ne 1 ]; then + print_usage +fi + +if [ $1 == "publish-resp" ]; then + PUBLISH_RESP="-e PUBLISH_RESP=1" +elif [ $1 == "ignore-publish" ]; then + PUBLISH_RESP="" +else print_usage fi @@ -41,5 +49,5 @@ docker rm -f kafkamessagedispatcher > /dev/null 2>&1 echo "Starting Kafka message dispatcher..." echo "PWD path: "$PWD -#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds -docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher +#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds, PUBLISH_RESP decides auto responding for testing that run by A1 sim +docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 $PUBLISH_RESP --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/timeout_test.sh similarity index 94% rename from near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh rename to near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/timeout_test.sh index 4fa8ac0..f4e080c 100755 --- a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/timeout_test.sh @@ -78,7 +78,7 @@ do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 408 jsonfiles/alpha_policy.j proc_id=$! sleep $timeout # after time out duration, publish the event -publish_response_event $req_id +publish_response_event $req_id kafkatopicres # wait until the main process to be completed wait $proc_id @@ -91,7 +91,7 @@ do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy proc_id=$! sleep 10 # after 10s, publish the event -publish_response_event $req_id +publish_response_event $req_id kafkatopicres2 # wait until the main process to be completed wait $proc_id @@ -105,7 +105,7 @@ do_curl GET /policytypes/STD_2/kafkadispatcher/alpha/status 408 jsonfiles/alpha proc_id=$! sleep $timeout # after time out duration, publish the event -publish_response_event $req_id +publish_response_event $req_id kafkatopicres3 # wait until the main process to be completed wait $proc_id @@ -118,7 +118,7 @@ do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_p proc_id=$! sleep 15 # after 15s, publish the event -publish_response_event $req_id +publish_response_event $req_id kafkatopicres # wait until the main process to be completed wait $proc_id @@ -129,7 +129,7 @@ RESULT="" # asynch callout do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & proc_id=$! -publish_response_event $req_id +publish_response_event $req_id kafkatopicres2 # wait until the main process to be completed wait $proc_id diff --git a/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh b/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh index 97f2a0f..eae4e37 100755 --- a/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh +++ b/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh @@ -21,11 +21,11 @@ # Make sure to run the simulator with the same arg as this script print_usage() { - echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure|ignore-kafka-srv" + echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure publish-resp|ignore-publish" exit 1 } -if [ $# -ne 2 ]; then +if [ $# -ne 3 ]; then print_usage fi @@ -41,8 +41,14 @@ if [ $2 == "kafka-srv" ]; then URL="http://localhost:7075" elif [ $2 == "kafka-srv-secure" ]; then URL="https://localhost:7175" -elif [ $2 == "ignore-kafka-srv" ]; then - URL="" +else + print_usage +fi + +if [ $3 == "publish-resp" ]; then + PUBLISH_RESP="-e PUBLISH_RESP=1" +elif [ $3 == "ignore-publish" ]; then + PUBLISH_RESP="" else print_usage fi @@ -89,7 +95,7 @@ fi # Runs kafka server in detached mode # In order to tail logs use:: docker logs -f kafkamessagedispatcher if [ ! -z "$URL" ]; then - docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher + docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 $PUBLISH_RESP --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher fi # Runs A1 simulator diff --git a/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py b/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py new file mode 100644 index 0000000..f7dfb65 --- /dev/null +++ b/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py @@ -0,0 +1,125 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# This is a script for test-purposes only +# It consumes a response-event from a kafka bus with different apporaches +# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed +# To instal kafka-python please use: pip install kafka-python +# Example of an response-event json +#{ + #"response-code": "400", + #"error-info": "Bad format" +#} + + +import os +import json +import sys +import math +import time + +from kafka import KafkaConsumer, TopicPartition +from threading import RLock + +# Response string with JSON format +response_data_JSON = """ +{ + "response-code": 200, + "error-info": "" +} +""" + +# in seconds +TIME_OUT=30 +target_topic_res='kafkatopicres' +MSG_BROKER_URL='localhost:9092' + +# Instantiate KafkaConsumer with keyword arguments +# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html +def create_kafka_consumer(): + consumer = KafkaConsumer( + # kafka cluster endpoint + bootstrap_servers = MSG_BROKER_URL, + # move to the earliest or latest available message + auto_offset_reset = 'earliest', + # number of milliseconds to block during message iteration + # if no new message available during this period of time, iteration through a for-loop will stop automatically + consumer_timeout_ms = 100, + value_deserializer = lambda m: json.loads(m.decode('ascii')), + #enable_auto_commit=False + ) + return consumer + +# Helper: Searches for req_id by seeking every five seconds up to thirty seconds +# Helper: If the req_id is found, then ConsumerRecord will be returned +# Helper: If the req_id is not found, then Response Request Timeout will be returned +def consume(req_id): + + try: + print ('req_id looking for in consumer:', req_id) + consumer = create_kafka_consumer() + # Latch to target topic and partition + topic_partition = TopicPartition(target_topic_res, 0) + consumer.assign([topic_partition]) + + sleep_period_in_sec = 5 + poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec) + poll_retries = 0 + + while (poll_retries < poll_cycle_threshold): + for consumer_record in consumer: + # Get req_id as msg_key and converts it from byte to str for each consumer record + msg_key = byte_to_str(consumer_record.key) + print ('msg_key in a consumer_record:', msg_key) + if (req_id == msg_key): + print ('req_id is found in consumer records', req_id) + return consumer_record + + print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...') + time.sleep(sleep_period_in_sec) + poll_retries += 1 + + return 1 + except Exception as err: + print('Error while consume record for req_id', err) + return 1 + finally: + consumer.close() + +# Helper: calculates poll cycle threshold +def calc_pollcycle_threshold(sleep_period_in_sec): + + poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec + poll_cycle_threshold = math.floor(poll_cycle_threshold) + return poll_cycle_threshold + +# Helper: Converts a byte array to a str +def byte_to_str(byte_arr): + + if (byte_arr is not None): + return byte_arr.decode('utf-8') + else: + return None + +if __name__ == '__main__': + try: + requestid = sys.argv[1] + future = consume(requestid) + except Exception as err: + print('Error in __main__', err) + print (1) + sys.exit() diff --git a/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py b/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py index 635dc6d..251ba09 100644 --- a/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py +++ b/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py @@ -51,7 +51,7 @@ def create_kafka_producer(): return producer # Helper: Publishes (to) the target broker and the topic in synch -def publish(kafka_evet, req_id): +def publish(kafka_evet, req_id, targettopic): # Instantiate KafkaProducer with keyword arguments producer = create_kafka_producer() @@ -62,7 +62,7 @@ def publish(kafka_evet, req_id): # synch-publish # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None) - fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id) + fut_rec_metadata = producer.send(targettopic, kafka_evet, req_id) return fut_rec_metadata.get() except Exception as err: @@ -74,8 +74,9 @@ if __name__ == '__main__': try: requestid = sys.argv[1] + targettopic = sys.argv[2] # response_data_JSON is str - future = publish(response_data_JSON, requestid) + future = publish(response_data_JSON, requestid, targettopic) if (future is not None): print (0) diff --git a/near-rt-ric-simulator/test/common/test_common.sh b/near-rt-ric-simulator/test/common/test_common.sh index af4e79b..e1191d2 100755 --- a/near-rt-ric-simulator/test/common/test_common.sh +++ b/near-rt-ric-simulator/test/common/test_common.sh @@ -83,12 +83,12 @@ do_curl() { # The aim of this function is to realize error related test cases only # The request_id for the Kafka msg, should be passed here as a function parameter publish_response_event() { - if [ $# -lt 1 ]; then - echo "Need 1 parameter, " + if [ $# -ne 2 ]; then + echo "Need 2 parameter, " echo "Exiting test script....." exit 1 fi - res=$(python ../common/publish_response_event_to_kafka_bus.py "$1") + res=$(python ../common/publish_response_event_to_kafka_bus.py "$1" "$2") if [ $res -eq 0 ]; then echo " Result as expected " else