Patch Set 2: Enriches test cases using more than one response topic and develops flag-controlled publish to response-topic feature.
Issue-ID: NONRTRIC-757
Change-Id: I99a50a44a2862787fda8d52e645ad01194af6a0b
Signed-off-by: halil.cakal <halil.cakal@est.tech>
},
"STD_1": {
"request_topic": "kafkatopicreq2",
- "response_topic": "kafkatopicres"
+ "response_topic": "kafkatopicres2"
},
"STD_2": {
"request_topic": "kafkatopicreq3",
- "response_topic": "kafkatopicres"
+ "response_topic": "kafkatopicres3"
}
}
MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
TIME_OUT=os.getenv('TIME_OUT')
+publishresponse=os.getenv('PUBLISH_RESP')
#Constsants
APPL_JSON='application/json'
print('Future:', record_metadata)
publish_time_in_ms = record_metadata.timestamp
- # For test purposes only, publish the success response event with no error-info to response topic
- # if basic_test_with_cust_header.sh is being used, then comment this line
- # else comment out this line for the basic_test.sh
- kafka_response_event = create_kafka_response_event(200, "")
- producer.send(target_topic_res, kafka_response_event, req_id)
+ # For test purposes only triggered from A1 sim
+ # Publish the success response event with no error-info to response topic
+ # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test
+ if (publishresponse is not None and req_id_from_header is None):
+ kafka_response_event = create_kafka_response_event(200, "")
+ producer.send(target_topic_res, kafka_response_event, req_id)
# synch-consume
consumer_record = consume_record_for(req_id, target_topic_res)
def consume_record_for(req_id, target_topic_res):
try:
- print ('req_id looking for in consumer:', req_id)
+ print ('req_id looking for in consumer: ' + target_topic_res, req_id)
consumer = create_kafka_consumer()
topic_partition = TopicPartition(target_topic_res, 0)
consumer.assign([topic_partition])
- # calculates poll cycle threshold
sleep_period_in_sec = 5
- poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
- poll_cycle_threshold = math.floor(poll_cycle_threshold)
- print('poll_cycle_threshold', poll_cycle_threshold)
-
+ poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
poll_retries = 0
- starting_offset = 0
- prev_last_offset = 0
+
while (poll_retries < poll_cycle_threshold):
- # Manually specify the fetch offset for a TopicPartition
- consumer.seek(topic_partition, starting_offset)
- # Get the last offset for the given partitions
- last_offset = consumer.end_offsets([topic_partition])[topic_partition]
- print('last_offset',last_offset)
-
- if (last_offset != prev_last_offset):
- for consumer_record in consumer:
- # Get req_id as msg_key and converts it from byte to str for each consumer record
- msg_key = byte_to_str(consumer_record.key)
- print ('msg_key in a consumer_record:', msg_key)
- if (req_id == msg_key):
- print ('req_id is found in consumer records', req_id)
- return consumer_record
- elif (consumer_record.offset == last_offset - 1):
- break
+ for consumer_record in consumer:
+ # Get req_id as msg_key and converts it from byte to str for each consumer record
+ msg_key = byte_to_str(consumer_record.key)
+ print ('msg_key in a consumer_record:', msg_key)
+ if (req_id == msg_key):
+ print ('req_id is found in consumer records', req_id)
+ return consumer_record
print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
time.sleep(sleep_period_in_sec)
-
poll_retries += 1
- prev_last_offset = last_offset
- starting_offset += last_offset
# Returns time-out response
pjson=create_error_response(408)
finally:
consumer.close()
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+ poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+ poll_cycle_threshold = math.floor(poll_cycle_threshold)
+ return poll_cycle_threshold
# Helper: Create a response object if forced http response code is set
def get_forced_response():
# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
def create_kafka_consumer():
consumer = KafkaConsumer(
- #KAFKA_TOPIC_RES,
+ # kafka cluster endpoint
bootstrap_servers = MSG_BROKER_URL,
+ # move to the earliest or latest available message
auto_offset_reset = 'earliest',
+ # number of milliseconds to block during message iteration
+ # if no new message available during this period of time, iteration through a for-loop will stop automatically
+ consumer_timeout_ms = 100,
value_deserializer = lambda m: json.loads(m.decode('ascii')),
#enable_auto_commit=False
)
do_curl GET /policytypetotopicmapping/ANR 200
echo "=== Put policy: shall publish and consume for put policy operation ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200
+do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
echo "=== Set force delay 5 sec ==="
RESULT="Force delay: 5 sec set for all dispatcher responses until it is resetted"
do_curl POST '/dispatcheradmin/forcedelay?delay=5' 200
-echo "=== Put policy: shall wait at least <delay-time> sec and then respond while publishing and consuming ==="
-RESULT=""
-do_elapsetime_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json 5
+echo "=== Hello world: shall wait at least <delay-time> sec and then respond while hello world ==="
+RESULT="OK"
+do_elapsetime_curl GET / 200 jsonfiles/alpha_policy.json 5
echo "=== Reset force delay ==="
RESULT="Force delay has been resetted for all dispatcher responses"
do_curl POST /dispatcheradmin/forcedelay 200
echo "=== Put policy: shall publish and consume for put policy operation for beta ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl PUT /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json
+do_curl PUT /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl PUT /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+do_curl PUT /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres3
+wait $proc_id
echo "=== Set force response code: 500 ==="
RESULT="Force response code: 500 set for all dispatcher response until it is resetted"
do_curl POST '/dispatcheradmin/forceresponse?code=500' 200
echo "=== Put policy: shall not publish and consume for put policy operation for alpha ==="
+req_id=$(get_random_number)
res=$(cat jsonfiles/forced_response.json)
RESULT="json:$res"
-do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json
+do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
echo "=== Reset force response code ==="
RESULT="Force response code has been resetted for dispatcher responses"
do_curl POST /dispatcheradmin/forceresponse 200
echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres
+wait $proc_id
echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+req_id=$(get_random_number)
RESULT=""
-do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200
+do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id kafkatopicres2
+wait $proc_id
echo "********************"
echo "*** All tests ok ***"
# ============LICENSE_END=================================================
#
-# Script to build and start the container
-# Make sure to run the simulator with the same arg as this script
+# Script to build and start the kafka dispatcher container
+# Make sure to run the simulator including args as is this script
print_usage() {
- echo "Usage: ./build_and_start.sh "
+ echo "Usage: ./build_and_start.sh publish-resp|ignore-publish"
exit 1
}
-if [ $# -ge 1 ]; then
+if [ $# -ne 1 ]; then
+ print_usage
+fi
+
+if [ $1 == "publish-resp" ]; then
+ PUBLISH_RESP="-e PUBLISH_RESP=1"
+elif [ $1 == "ignore-publish" ]; then
+ PUBLISH_RESP=""
+else
print_usage
fi
echo "Starting Kafka message dispatcher..."
echo "PWD path: "$PWD
-#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds
-docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
+#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds, PUBLISH_RESP decides auto responding for testing that run by A1 sim
+docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 $PUBLISH_RESP --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
proc_id=$!
sleep $timeout
# after time out duration, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres
# wait until the main process to be completed
wait $proc_id
proc_id=$!
sleep 10
# after 10s, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres2
# wait until the main process to be completed
wait $proc_id
proc_id=$!
sleep $timeout
# after time out duration, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres3
# wait until the main process to be completed
wait $proc_id
proc_id=$!
sleep 15
# after 15s, publish the event
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres
# wait until the main process to be completed
wait $proc_id
# asynch callout
do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
proc_id=$!
-publish_response_event $req_id
+publish_response_event $req_id kafkatopicres2
# wait until the main process to be completed
wait $proc_id
# Make sure to run the simulator with the same arg as this script
print_usage() {
- echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure|ignore-kafka-srv"
+ echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure publish-resp|ignore-publish"
exit 1
}
-if [ $# -ne 2 ]; then
+if [ $# -ne 3 ]; then
print_usage
fi
URL="http://localhost:7075"
elif [ $2 == "kafka-srv-secure" ]; then
URL="https://localhost:7175"
-elif [ $2 == "ignore-kafka-srv" ]; then
- URL=""
+else
+ print_usage
+fi
+
+if [ $3 == "publish-resp" ]; then
+ PUBLISH_RESP="-e PUBLISH_RESP=1"
+elif [ $3 == "ignore-publish" ]; then
+ PUBLISH_RESP=""
else
print_usage
fi
# Runs kafka server in detached mode
# In order to tail logs use:: docker logs -f kafkamessagedispatcher
if [ ! -z "$URL" ]; then
- docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
+ docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 $PUBLISH_RESP --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
fi
# Runs A1 simulator
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# This is a script for test-purposes only
+# It consumes a response-event from a kafka bus with different apporaches
+# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
+# To instal kafka-python please use: pip install kafka-python
+# Example of an response-event json
+#{
+ #"response-code": "400",
+ #"error-info": "Bad format"
+#}
+
+
+import os
+import json
+import sys
+import math
+import time
+
+from kafka import KafkaConsumer, TopicPartition
+from threading import RLock
+
+# Response string with JSON format
+response_data_JSON = """
+{
+ "response-code": 200,
+ "error-info": ""
+}
+"""
+
+# in seconds
+TIME_OUT=30
+target_topic_res='kafkatopicres'
+MSG_BROKER_URL='localhost:9092'
+
+# Instantiate KafkaConsumer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
+def create_kafka_consumer():
+ consumer = KafkaConsumer(
+ # kafka cluster endpoint
+ bootstrap_servers = MSG_BROKER_URL,
+ # move to the earliest or latest available message
+ auto_offset_reset = 'earliest',
+ # number of milliseconds to block during message iteration
+ # if no new message available during this period of time, iteration through a for-loop will stop automatically
+ consumer_timeout_ms = 100,
+ value_deserializer = lambda m: json.loads(m.decode('ascii')),
+ #enable_auto_commit=False
+ )
+ return consumer
+
+# Helper: Searches for req_id by seeking every five seconds up to thirty seconds
+# Helper: If the req_id is found, then ConsumerRecord will be returned
+# Helper: If the req_id is not found, then Response Request Timeout will be returned
+def consume(req_id):
+
+ try:
+ print ('req_id looking for in consumer:', req_id)
+ consumer = create_kafka_consumer()
+ # Latch to target topic and partition
+ topic_partition = TopicPartition(target_topic_res, 0)
+ consumer.assign([topic_partition])
+
+ sleep_period_in_sec = 5
+ poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
+ poll_retries = 0
+
+ while (poll_retries < poll_cycle_threshold):
+ for consumer_record in consumer:
+ # Get req_id as msg_key and converts it from byte to str for each consumer record
+ msg_key = byte_to_str(consumer_record.key)
+ print ('msg_key in a consumer_record:', msg_key)
+ if (req_id == msg_key):
+ print ('req_id is found in consumer records', req_id)
+ return consumer_record
+
+ print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
+ time.sleep(sleep_period_in_sec)
+ poll_retries += 1
+
+ return 1
+ except Exception as err:
+ print('Error while consume record for req_id', err)
+ return 1
+ finally:
+ consumer.close()
+
+# Helper: calculates poll cycle threshold
+def calc_pollcycle_threshold(sleep_period_in_sec):
+
+ poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+ poll_cycle_threshold = math.floor(poll_cycle_threshold)
+ return poll_cycle_threshold
+
+# Helper: Converts a byte array to a str
+def byte_to_str(byte_arr):
+
+ if (byte_arr is not None):
+ return byte_arr.decode('utf-8')
+ else:
+ return None
+
+if __name__ == '__main__':
+ try:
+ requestid = sys.argv[1]
+ future = consume(requestid)
+ except Exception as err:
+ print('Error in __main__', err)
+ print (1)
+ sys.exit()
return producer
# Helper: Publishes (to) the target broker and the topic in synch
-def publish(kafka_evet, req_id):
+def publish(kafka_evet, req_id, targettopic):
# Instantiate KafkaProducer with keyword arguments
producer = create_kafka_producer()
# synch-publish
# KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
- fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id)
+ fut_rec_metadata = producer.send(targettopic, kafka_evet, req_id)
return fut_rec_metadata.get()
except Exception as err:
try:
requestid = sys.argv[1]
+ targettopic = sys.argv[2]
# response_data_JSON is str
- future = publish(response_data_JSON, requestid)
+ future = publish(response_data_JSON, requestid, targettopic)
if (future is not None):
print (0)
# The aim of this function is to realize error related test cases only
# The request_id for the Kafka msg, should be passed here as a function parameter
publish_response_event() {
- if [ $# -lt 1 ]; then
- echo "Need 1 parameter, <request_id>"
+ if [ $# -ne 2 ]; then
+ echo "Need 2 parameter, <request_id> <target_topic>"
echo "Exiting test script....."
exit 1
fi
- res=$(python ../common/publish_response_event_to_kafka_bus.py "$1")
+ res=$(python ../common/publish_response_event_to_kafka_bus.py "$1" "$2")
if [ $res -eq 0 ]; then
echo " Result as expected "
else