Consumer algorithm improvment via autoblock during message iteration 33/8833/2
authorhalil.cakal <halil.cakal@est.tech>
Fri, 5 Aug 2022 07:49:13 +0000 (08:49 +0100)
committerhalil.cakal <halil.cakal@est.tech>
Mon, 8 Aug 2022 07:43:11 +0000 (08:43 +0100)
Patch Set 2: Enriches test cases using more than one response topic and develops flag-controlled publish to response-topic feature.
Issue-ID: NONRTRIC-757
Change-Id: I99a50a44a2862787fda8d52e645ad01194af6a0b
Signed-off-by: halil.cakal <halil.cakal@est.tech>
near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/timeout_test.sh [moved from near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh with 94% similarity]
near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh
near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py [new file with mode: 0644]
near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py
near-rt-ric-simulator/test/common/test_common.sh

index 254cb4d..9c5cb9b 100644 (file)
@@ -5,10 +5,10 @@
   },
   "STD_1": {
     "request_topic": "kafkatopicreq2",
-    "response_topic": "kafkatopicres"
+    "response_topic": "kafkatopicres2"
   },
   "STD_2": {
     "request_topic": "kafkatopicreq3",
-    "response_topic": "kafkatopicres"
+    "response_topic": "kafkatopicres3"
   }
 }
index 19e2ab6..08a4eed 100644 (file)
@@ -31,6 +31,7 @@ from maincommon import create_kafka_producer, create_kafka_consumer, create_kafk
 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
 
 TIME_OUT=os.getenv('TIME_OUT')
+publishresponse=os.getenv('PUBLISH_RESP')
 
 #Constsants
 APPL_JSON='application/json'
@@ -160,11 +161,12 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
     print('Future:', record_metadata)
     publish_time_in_ms = record_metadata.timestamp
 
-    # For test purposes only, publish the success response event with no error-info to response topic
-    # if basic_test_with_cust_header.sh is being used, then comment this line
-    # else comment out this line for the basic_test.sh
-    kafka_response_event = create_kafka_response_event(200, "")
-    producer.send(target_topic_res, kafka_response_event, req_id)
+    # For test purposes only triggered from A1 sim
+    # Publish the success response event with no error-info to response topic
+    # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test
+    if (publishresponse is not None and req_id_from_header is None):
+      kafka_response_event = create_kafka_response_event(200, "")
+      producer.send(target_topic_res, kafka_response_event, req_id)
 
     # synch-consume
     consumer_record = consume_record_for(req_id, target_topic_res)
@@ -214,44 +216,27 @@ def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
 def consume_record_for(req_id, target_topic_res):
 
   try:
-    print ('req_id looking for in consumer:', req_id)
+    print ('req_id looking for in consumer: ' + target_topic_res, req_id)
     consumer = create_kafka_consumer()
     topic_partition = TopicPartition(target_topic_res, 0)
     consumer.assign([topic_partition])
 
-    # calculates poll cycle threshold
     sleep_period_in_sec = 5
-    poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
-    poll_cycle_threshold = math.floor(poll_cycle_threshold)
-    print('poll_cycle_threshold', poll_cycle_threshold)
-
+    poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
     poll_retries = 0
-    starting_offset = 0
-    prev_last_offset = 0
+
     while (poll_retries < poll_cycle_threshold):
-      # Manually specify the fetch offset for a TopicPartition
-      consumer.seek(topic_partition, starting_offset)
-      # Get the last offset for the given partitions
-      last_offset = consumer.end_offsets([topic_partition])[topic_partition]
-      print('last_offset',last_offset)
-
-      if (last_offset != prev_last_offset):
-        for consumer_record in consumer:
-          # Get req_id as msg_key and converts it from byte to str for each consumer record
-          msg_key = byte_to_str(consumer_record.key)
-          print ('msg_key in a consumer_record:', msg_key)
-          if (req_id == msg_key):
-            print ('req_id is found in consumer records', req_id)
-            return consumer_record
-          elif (consumer_record.offset == last_offset - 1):
-            break
+      for consumer_record in consumer:
+        # Get req_id as msg_key and converts it from byte to str for each consumer record
+        msg_key = byte_to_str(consumer_record.key)
+        print ('msg_key in a consumer_record:', msg_key)
+        if (req_id == msg_key):
+          print ('req_id is found in consumer records', req_id)
+          return consumer_record
 
       print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
       time.sleep(sleep_period_in_sec)
-
       poll_retries += 1
-      prev_last_offset = last_offset
-      starting_offset += last_offset
 
     # Returns time-out response
     pjson=create_error_response(408)
@@ -264,6 +249,12 @@ def consume_record_for(req_id, target_topic_res):
   finally:
     consumer.close()
 
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+    poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+    poll_cycle_threshold = math.floor(poll_cycle_threshold)
+    return poll_cycle_threshold
 
 # Helper: Create a response object if forced http response code is set
 def get_forced_response():
index c534acb..d5b65ae 100644 (file)
@@ -65,9 +65,13 @@ def create_kafka_producer():
 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
 def create_kafka_consumer():
   consumer = KafkaConsumer(
-    #KAFKA_TOPIC_RES,
+    # kafka cluster endpoint
     bootstrap_servers = MSG_BROKER_URL,
+    # move to the earliest or latest available message
     auto_offset_reset = 'earliest',
+    # number of milliseconds to block during message iteration
+    # if no new message available during this period of time, iteration through a for-loop will stop automatically
+    consumer_timeout_ms = 100,
     value_deserializer = lambda m: json.loads(m.decode('ascii')),
     #enable_auto_commit=False
   )
index ef7014d..62c2fa5 100755 (executable)
@@ -61,65 +61,105 @@ RESULT="json:$res"
 do_curl GET /policytypetotopicmapping/ANR 200
 
 echo "=== Put policy: shall publish and consume for put policy operation ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
 
 echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
 
 echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl PUT  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+do_curl PUT  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
 
 echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200
+do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
 
 echo "=== Set force delay 5 sec ==="
 RESULT="Force delay: 5 sec set for all dispatcher responses until it is resetted"
 do_curl POST '/dispatcheradmin/forcedelay?delay=5' 200
 
-echo "=== Put policy: shall wait at least <delay-time> sec and then respond while publishing and consuming ==="
-RESULT=""
-do_elapsetime_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json 5
+echo "=== Hello world: shall wait at least <delay-time> sec and then respond while hello world ==="
+RESULT="OK"
+do_elapsetime_curl GET / 200 jsonfiles/alpha_policy.json 5
 
 echo "=== Reset force delay ==="
 RESULT="Force delay has been resetted for all dispatcher responses"
 do_curl POST /dispatcheradmin/forcedelay 200
 
 echo "=== Put policy: shall publish and consume for put policy operation for beta ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl PUT  /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json
+do_curl PUT  /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
 
 echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
 
 echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl PUT  /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+do_curl PUT  /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres3
+wait $proc_id
 
 echo "=== Set force response code: 500 ==="
 RESULT="Force response code: 500 set for all dispatcher response until it is resetted"
 do_curl POST  '/dispatcheradmin/forceresponse?code=500' 200
 
 echo "=== Put policy: shall not publish and consume for put policy operation for alpha ==="
+req_id=$(get_random_number)
 res=$(cat jsonfiles/forced_response.json)
 RESULT="json:$res"
-do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json
+do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
 
 echo "=== Reset force response code ==="
 RESULT="Force response code has been resetted for dispatcher responses"
 do_curl POST  /dispatcheradmin/forceresponse 200
 
 echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
 
 echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+req_id=$(get_random_number)
 RESULT=""
-do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200
+do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
 
 echo "********************"
 echo "*** All tests ok ***"
index dff351e..f0869ee 100755 (executable)
 #  ============LICENSE_END=================================================
 #
 
-# Script to build and start the container
-# Make sure to run the simulator with the same arg as this script
+# Script to build and start the kafka dispatcher container
+# Make sure to run the simulator including args as is this script
 
 print_usage() {
-    echo "Usage: ./build_and_start.sh "
+    echo "Usage: ./build_and_start.sh publish-resp|ignore-publish"
     exit 1
 }
 
-if [ $# -ge 1 ]; then
+if [ $# -ne 1 ]; then
+    print_usage
+fi
+
+if [ $1 == "publish-resp" ]; then
+    PUBLISH_RESP="-e PUBLISH_RESP=1"
+elif  [ $1 == "ignore-publish" ]; then
+    PUBLISH_RESP=""
+else
     print_usage
 fi
 
@@ -41,5 +49,5 @@ docker rm -f kafkamessagedispatcher > /dev/null 2>&1
 echo "Starting Kafka message dispatcher..."
 echo "PWD path: "$PWD
 
-#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds
-docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
+#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds, PUBLISH_RESP decides auto responding for testing that run by A1 sim
+docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 $PUBLISH_RESP --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
@@ -78,7 +78,7 @@ do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 408 jsonfiles/alpha_policy.j
 proc_id=$!
 sleep $timeout
 # after time out duration, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres
 # wait until the main process to be completed
 wait $proc_id
 
@@ -91,7 +91,7 @@ do_curl PUT  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy
 proc_id=$!
 sleep 10
 # after 10s, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres2
 # wait until the main process to be completed
 wait $proc_id
 
@@ -105,7 +105,7 @@ do_curl GET  /policytypes/STD_2/kafkadispatcher/alpha/status 408 jsonfiles/alpha
 proc_id=$!
 sleep $timeout
 # after time out duration, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres3
 # wait until the main process to be completed
 wait $proc_id
 
@@ -118,7 +118,7 @@ do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_p
 proc_id=$!
 sleep 15
 # after 15s, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres
 # wait until the main process to be completed
 wait $proc_id
 
@@ -129,7 +129,7 @@ RESULT=""
 # asynch callout
 do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
 proc_id=$!
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres2
 # wait until the main process to be completed
 wait $proc_id
 
index 97f2a0f..eae4e37 100755 (executable)
 # Make sure to run the simulator with the same arg as this script
 
 print_usage() {
-    echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure|ignore-kafka-srv"
+    echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure publish-resp|ignore-publish"
     exit 1
 }
 
-if [ $# -ne 2 ]; then
+if [ $# -ne 3 ]; then
     print_usage
 fi
 
@@ -41,8 +41,14 @@ if [ $2 == "kafka-srv" ]; then
     URL="http://localhost:7075"
 elif  [ $2 == "kafka-srv-secure" ]; then
     URL="https://localhost:7175"
-elif  [ $2 == "ignore-kafka-srv" ]; then
-    URL=""
+else
+    print_usage
+fi
+
+if [ $3 == "publish-resp" ]; then
+    PUBLISH_RESP="-e PUBLISH_RESP=1"
+elif  [ $3 == "ignore-publish" ]; then
+    PUBLISH_RESP=""
 else
     print_usage
 fi
@@ -89,7 +95,7 @@ fi
 # Runs kafka server in detached mode
 # In order to tail logs use:: docker logs -f kafkamessagedispatcher
 if [ ! -z "$URL" ]; then
-    docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
+    docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 $PUBLISH_RESP --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
 fi
 
 # Runs A1 simulator
diff --git a/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py b/near-rt-ric-simulator/test/common/consume_events_from_kafka_bus.py
new file mode 100644 (file)
index 0000000..f7dfb65
--- /dev/null
@@ -0,0 +1,125 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# This is a script for test-purposes only
+# It consumes a response-event from a kafka bus with different apporaches
+# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
+# To instal kafka-python please use: pip install kafka-python
+# Example of an response-event json
+#{
+  #"response-code": "400",
+  #"error-info": "Bad format"
+#}
+
+
+import os
+import json
+import sys
+import math
+import time
+
+from kafka import KafkaConsumer, TopicPartition
+from threading import RLock
+
+# Response string with JSON format
+response_data_JSON =  """
+{
+  "response-code": 200,
+  "error-info": ""
+}
+"""
+
+# in seconds
+TIME_OUT=30
+target_topic_res='kafkatopicres'
+MSG_BROKER_URL='localhost:9092'
+
+# Instantiate KafkaConsumer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
+def create_kafka_consumer():
+  consumer = KafkaConsumer(
+    # kafka cluster endpoint
+    bootstrap_servers = MSG_BROKER_URL,
+    # move to the earliest or latest available message
+    auto_offset_reset = 'earliest',
+    # number of milliseconds to block during message iteration
+    # if no new message available during this period of time, iteration through a for-loop will stop automatically
+    consumer_timeout_ms = 100,
+    value_deserializer = lambda m: json.loads(m.decode('ascii')),
+    #enable_auto_commit=False
+  )
+  return consumer
+
+# Helper: Searches for req_id by seeking every five seconds up to thirty seconds
+# Helper: If the req_id is found, then ConsumerRecord will be returned
+# Helper: If the req_id is not found, then Response Request Timeout will be returned
+def consume(req_id):
+
+  try:
+    print ('req_id looking for in consumer:', req_id)
+    consumer = create_kafka_consumer()
+    # Latch to target topic and partition
+    topic_partition = TopicPartition(target_topic_res, 0)
+    consumer.assign([topic_partition])
+
+    sleep_period_in_sec = 5
+    poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
+    poll_retries = 0
+
+    while (poll_retries < poll_cycle_threshold):
+      for consumer_record in consumer:
+        # Get req_id as msg_key and converts it from byte to str for each consumer record
+        msg_key = byte_to_str(consumer_record.key)
+        print ('msg_key in a consumer_record:', msg_key)
+        if (req_id == msg_key):
+          print ('req_id is found in consumer records', req_id)
+          return consumer_record
+
+      print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
+      time.sleep(sleep_period_in_sec)
+      poll_retries += 1
+
+    return 1
+  except Exception as err:
+    print('Error while consume record for req_id', err)
+    return 1
+  finally:
+    consumer.close()
+
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+    poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+    poll_cycle_threshold = math.floor(poll_cycle_threshold)
+    return poll_cycle_threshold
+
+# Helper: Converts a byte array to a str
+def byte_to_str(byte_arr):
+
+  if (byte_arr is not None):
+    return byte_arr.decode('utf-8')
+  else:
+    return None
+
+if __name__ == '__main__':
+    try:
+      requestid = sys.argv[1]
+      future = consume(requestid)
+    except Exception as err:
+      print('Error in __main__', err)
+      print (1)
+    sys.exit()
index 635dc6d..251ba09 100644 (file)
@@ -51,7 +51,7 @@ def create_kafka_producer():
   return producer
 
 # Helper: Publishes (to) the target broker and the topic in synch
-def publish(kafka_evet, req_id):
+def publish(kafka_evet, req_id, targettopic):
 
   # Instantiate KafkaProducer with keyword arguments
   producer = create_kafka_producer()
@@ -62,7 +62,7 @@ def publish(kafka_evet, req_id):
 
     # synch-publish
     # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
-    fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id)
+    fut_rec_metadata = producer.send(targettopic, kafka_evet, req_id)
     return fut_rec_metadata.get()
 
   except Exception as err:
@@ -74,8 +74,9 @@ if __name__ == '__main__':
     try:
 
         requestid = sys.argv[1]
+        targettopic = sys.argv[2]
         # response_data_JSON is str
-        future = publish(response_data_JSON, requestid)
+        future = publish(response_data_JSON, requestid, targettopic)
 
         if (future is not None):
             print (0)
index af4e79b..e1191d2 100755 (executable)
@@ -83,12 +83,12 @@ do_curl() {
 # The aim of this function is to realize error related test cases only
 # The request_id for the Kafka msg, should be passed here as a function parameter
 publish_response_event() {
-    if [ $# -lt 1 ]; then
-        echo "Need 1 parameter, <request_id>"
+    if [ $# -ne 2 ]; then
+        echo "Need 2 parameter, <request_id> <target_topic>"
         echo "Exiting test script....."
         exit 1
     fi
-    res=$(python ../common/publish_response_event_to_kafka_bus.py "$1")
+    res=$(python ../common/publish_response_event_to_kafka_bus.py "$1" "$2")
     if [ $res -eq 0 ]; then
         echo "  Result as expected  "
     else