TC_ONELINE_DESCR="App test DMAAP Meditor and DMAAP Adapter"
#App names to include in the test when running docker, space separated list
-DOCKER_INCLUDED_IMAGES="ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR"
+DOCKER_INCLUDED_IMAGES="ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR KAFKAPC"
#App names to include in the test when running kubernetes, space separated list
-KUBE_INCLUDED_IMAGES=" ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR"
+KUBE_INCLUDED_IMAGES=" ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR KAFKAPC"
#Prestarted app (not started by script) to include in the test when running kubernetes, space separated list
KUBE_PRESTARTED_IMAGES=""
set_ics_trace
start_mr "unauthenticated.dmaapmed.json" "/events" "dmaapmediatorproducer/STD_Fault_Messages" \
- "unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs" \
- "unauthenticated.dmaapadp_kafka.text" "/events" "dmaapadapterproducer/msgs"
+ "unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs"
+
+start_kafkapc
+
+kafkapc_api_create_topic 201 "unauthenticated.dmaapadp_kafka.text" "text/plain"
+
+kafkapc_api_start_sending 200 "unauthenticated.dmaapadp_kafka.text"
start_dmaapadp NOPROXY $SIM_GROUP/$DMAAP_ADP_COMPOSE_DIR/$DMAAP_ADP_CONFIG_FILE $SIM_GROUP/$DMAAP_ADP_COMPOSE_DIR/$DMAAP_ADP_DATA_FILE
DATA_DELIV_JOBS=0 #Total delivered msg per job per CR
mr_api_generate_json_payload_file 1 ./tmp/data_for_dmaap_test.json
-mr_api_generate_text_payload_file 1 ./tmp/data_for_dmaap_test.txt
+kafkapc_api_generate_text_payload_file 1 ./tmp/data_for_dmaap_test.txt
## Send json file via message-router to adapter
DATA_DELIV_JOBS=5 #Each job will eventuall get 2 msgs
## Send text file via message-router to adapter kafka
EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 1 30
for ((i=0; i<$NUM_CR; i++))
do
cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
done
EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 2 30
for ((i=0; i<$NUM_CR; i++))
do
cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
done
EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 3 30
for ((i=0; i<$NUM_CR; i++))
do
cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
done
EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 4 30
for ((i=0; i<$NUM_CR; i++))
do
cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
done
EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 5 30
for ((i=0; i<$NUM_CR; i++))
do
cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
print_timer
# Send small text via message-routere to adapter
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------1'
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------3'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------1'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------3'
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 7 30
# Wait for data recetption, adapter kafka
EXPECTED_DATA_DELIV=$(($NUM_JOBS*2/$NUM_CR+$EXPECTED_DATA_DELIV))
# Send small text via message-router to adapter kafka
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------5'
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------7'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------5'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------7'
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 9 30
# Wait for data recetption, adapter kafka
EXPECTED_DATA_DELIV=$(($NUM_JOBS*2/$NUM_CR+$EXPECTED_DATA_DELIV))
# ============LICENSE_END=================================================
#
-
TC_ONELINE_DESCR="Starts DMAAP MR"
#App names to include in the test when running docker, space separated list
-DOCKER_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY"
+DOCKER_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY KAFKAPC"
#App names to include in the test when running kubernetes, space separated list
-KUBE_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY"
+KUBE_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY KAFKAPC"
#Prestarted app (not started by script) to include in the test when running kubernetes, space separated list
KUBE_PRESTARTED_IMAGES=""
CONDITIONALLY_IGNORED_IMAGES=""
#Supported test environment profiles
-SUPPORTED_PROFILES="ONAP-HONOLULU ONAP-ISTANBUL ORAN-D-RELEASE ORAN-E-RELEASE"
+SUPPORTED_PROFILES="ORAN-E-RELEASE"
#Supported run modes
SUPPORTED_RUNMODES="DOCKER KUBE"
clean_environment
start_kube_proxy
start_mr "$MR_READ_TOPIC" "/events" "users/policy-agent" \
- "$MR_WRITE_TOPIC" "/events" "users/mr-stub" \
- "unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs" \
- "unauthenticated.dmaapmed.json" "/events" "maapmediatorproducer/STD_Fault_Messages"
+ "$MR_WRITE_TOPIC" "/events" "users/mr-stub"
+ #\
+ #"unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs" \
+ #"unauthenticated.dmaapmed.json" "/events" "maapmediatorproducer/STD_Fault_Messages"
+
+start_kafkapc
+
+kafkapc_api_reset 200
+
+kafkapc_api_create_topic 201 "unauthenticated.dmaapadp.json" "application/json"
+
+kafkapc_api_create_topic 201 "unauthenticated.dmaapmed.json" "application/json"
+
+dmaap_api_print_topics
if [ $RUNMODE == "KUBE" ]; then
:
__ADAPTER=$MR_DMAAP_ADAPTER_HTTP
__ADAPTER_TYPE=$MR_DMAAP_ADAPTER_TYPE
__RETRY_CODES=""
+ elif [ $1 == "KAFKAPC" ]; then
+ __ADAPTER=$KAFKAPC_ADAPTER
+ __ADAPTER_TYPE=$KAFKAPC_ADAPTER_TYPE
+ __RETRY_CODES=""
else
paramError=1
fi
if [ $# -ne 3 ]; then
paramError=1
fi
- #if [ $__ADAPTER == $__RESTBASE ] || [ $__ADAPTER == $__RESTBASE_SECURE ]; then
if [ $__ADAPTER_TYPE == "REST" ]; then
paramError=1
fi
if [ $paramError -eq 1 ]; then
((RES_CONF_FAIL++))
echo "-Incorrect number of parameters to __do_curl_to_api " $@ >> $HTTPLOG
- echo "-Expected: (PA|ICS GET|PUT|POST|DELETE|GET_BATCH|PUT_BATCH|POST_BATCH|DELETE_BATCH <url> [<file>]) | (PA|ICS RESPONSE <correlation-id>)" >> $HTTPLOG
+ echo "-Expected: (PA|ICS GET|PUT|POST|DELETE|GET_BATCH|PUT_BATCH|POST_BATCH|DELETE_BATCH <url> [<file> [mime-type]]) | (PA|ICS RESPONSE <correlation-id>)" >> $HTTPLOG
echo "-Returning response 000" >> $HTTPLOG
echo "-000"
return 1
fi
- #if [ $__ADAPTER == $__RESTBASE ] || [ $__ADAPTER == $__RESTBASE_SECURE ]; then
if [ $__ADAPTER_TYPE == "REST" ]; then
url=" "${__ADAPTER}${input_url}
oper=" -X "$oper
# Function to execute curl towards a container (or process) and compare + print result
# Intended use is for basic test scripts where testing is done with curl and the returned response and payload need to be checked.
-# args: GET|PUT|POST|DELETE <url> <target-response-code> [<json-file>]
+# args: GET|PUT|POST|DELETE <url> <target-response-code> [<payload-file>]
# All calls made to 'localhost:'<port>.
# Expects env PORT set to intended port number
# Expects env RESULT to contain the target response body.
# Optional env HTTPX shall contain protocol 'http' or 'https'. If not set, 'http' is used. For 'https' all cert errors are ignored
-# RESULT="*" means that returned payload is not checked, may container any text
+# RESULT="*" means that returned payload is not checked, may contain any text
# RESULT="<text>" means that the returned payload has to match the <text> exactly
# RESULT="json:<returned-payload>" means that the returned json payload is compared with the expected result (order of json keys and index is irrelevant)
# RESULT="json-array-size:<integer-size>" means that the returned json payload shall contain the number of element given by the <integer-size>
PROT=$HTTPX
fi
- curlstr="curl -X "$1" -skw %{http_code} ${PROT}://localhost:$PORT$2 -H accept:*/*"
+ req_content=""
+ if [ -z "$REQ_CONTENT" ]; then
+ if [ $# -gt 3 ]; then
+ req_content="-H Content-Type:application/json" #Assuming json
+ fi
+ else
+ req_content="-H Content-Type:$REQ_CONTENT"
+ fi
+ resp_content=""
+ if [ -z "$RESP_CONTENT" ]; then
+ if [[ "$RESULT" == "json"* ]]; then
+ resp_content="application/json"
+ elif [[ "$RESULT" == "*" ]]; then
+ resp_content=""
+ else
+ resp_content="text/plain"
+ fi
+ else
+ resp_content=$RESP_CONTENT
+ fi
+ curlstr="curl -X "$1" -skw :%{content_type}:%{http_code} ${PROT}://localhost:$PORT$2 -H accept:*/*"
if [ $# -gt 3 ]; then
- curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4
+ curlstr=$curlstr" $req_content --data-binary @"$4
fi
echo " CMD:"$curlstr
res=$($curlstr)
status=${res:${#res}-3}
- body=${res:0:${#res}-3}
+ reminder=${res:0:${#res}-4}
+ content_type="${reminder##*:}"
+ body="${reminder%:*}"
+
export body
if [ $status -ne $3 ]; then
echo " Error status:"$status" Expected status: "$3
exit 1
else
echo " OK, code: "$status" (Expected)"
+ if [[ "$content_type" == *"$resp_content"* ]]; then
+ echo " Content type: "$content_type" (Expected)"
+ else
+ echo " Expected content type: "$resp_content
+ echo " Got: "$content_type
+ echo "Exiting....."
+ exit 1
+ fi
echo " Body: "$body
if [ "$RESULT" == "*" ]; then
echo " Body contents not checked"
# Scale kubernetes resources to zero and wait until this has been accomplished, if relevant. If not relevant to scale, then do no action.
# This function is called for prestarted apps not managed by the test script.
__HTTPPROXY_kube_scale_zero_and_wait() {
- echo -e $RED" HTTPPROXY replicas kept as is"$ERED
+ echo -e $RED" HTTPPROXY app is not scaled in this state"$ERED
}
# Delete all kube resouces for the app
# This function is called for apps managed by the test script.
# args: -
__HTTPPROXY_initial_setup() {
- :
+ use_http_proxy_http
}
# Set app short-name, app name and namespace for logging runtime statistics of kubernets pods or docker containers
#######################################################
-
-## Access to Http Proxy Receiver
-# Host name may be changed if app started by kube
-# Direct access from script
-HTTP_PROXY_HTTPX="http"
-HTTP_PROXY_HOST_NAME=$LOCALHOST_NAME
-HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_EXTERNAL_PORT
-
-#########################
-### Http Proxy functions
-#########################
-
-# All calls to httpproxy will be directed to the http interface
+# Set http as the protocol to use for all communication to the http proxy
# args: -
# (Function for test scripts)
use_http_proxy_http() {
- echo -e $BOLD"$HTTP_PROXY_DISPLAY_NAME protocol setting"$EBOLD
- echo -e " Using $BOLD http $EBOLD"
- HTTP_PROXY_HTTPX="http"
- HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_EXTERNAL_PORT
-
- echo ""
+ __http_proxy_set_protocoll "http" $HTTP_PROXY_INTERNAL_PORT $HTTP_PROXY_EXTERNAL_PORT
}
-# All calls to httpproxy will be directed to the https interface
+# Set https as the protocol to use for all communication to the http proxy
# args: -
# (Function for test scripts)
use_http_proxy_https() {
+ __http_proxy_set_protocoll "https" $HTTP_PROXY_INTERNAL_SECURE_PORT $HTTP_PROXY_EXTERNAL_SECURE_PORT
+}
+
+# Setup paths to svc/container for internal and external access
+# args: <protocol> <internal-port> <external-port>
+__http_proxy_set_protocoll() {
echo -e $BOLD"$HTTP_PROXY_DISPLAY_NAME protocol setting"$EBOLD
- echo -e " Using $BOLD https $EBOLD"
- HTTP_PROXY_HTTPX="https"
- HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_EXTERNAL_SECURE_PORT
+ echo -e " Using $BOLD $1 $EBOLD towards $HTTP_PROXY_DISPLAY_NAME"
+
+ ## Access to http proxy
+ ## HTTP_PROXY_CONFIG_HOST_NAME and HTTP_PROXY_CONFIG_PORT used by apps as config for proxy host and port
+
+ HTTP_PROXY_SERVICE_PATH=$1"://"$HTTP_PROXY_APP_NAME":"$2 # docker access, container->container and script->container via proxy
+ HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME
+ HTTP_PROXY_CONFIG_PORT=$2
+ if [ $RUNMODE == "KUBE" ]; then
+ HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME"."$KUBE_SIM_NAMESPACE
+ HTTP_PROXY_CONFIG_PORT=$3
+ HTTP_PROXY_SERVICE_PATH=$1"://"$HTTP_PROXY_APP_NAME.$KUBE_SIM_NAMESPACE":"$3 # kube access, pod->svc and script->svc via proxy
+ fi
echo ""
}
+# Export env vars for config files, docker compose and kube resources
+# args:
+__http_proxy_export_vars() {
+
+ export HTTP_PROXY_APP_NAME
+ export HTTP_PROXY_DISPLAY_NAME
+
+ export HTTP_PROXY_WEB_EXTERNAL_PORT
+ export HTTP_PROXY_WEB_INTERNAL_PORT
+ export HTTP_PROXY_EXTERNAL_PORT
+ export HTTP_PROXY_INTERNAL_PORT
+
+ export HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
+ export HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
+ export HTTP_PROXY_EXTERNAL_SECURE_PORT
+ export HTTP_PROXY_INTERNAL_SECURE_PORT
+
+ export KUBE_SIM_NAMESPACE
+ export DOCKER_SIM_NWNAME
+ export HTTP_PROXY_IMAGE
+}
+
# Start the Http Proxy in the simulator group
# args: -
# (Function for test scripts)
if [ $retcode_i -eq 0 ]; then
echo -e " Creating $HTTP_PROXY_APP_NAME deployment and service"
- export HTTP_PROXY_APP_NAME
-
- export HTTP_PROXY_WEB_EXTERNAL_PORT
- export HTTP_PROXY_WEB_INTERNAL_PORT
- export HTTP_PROXY_EXTERNAL_PORT
- export HTTP_PROXY_INTERNAL_PORT
-
- export HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
- export HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
- export HTTP_PROXY_EXTERNAL_SECURE_PORT
- export HTTP_PROXY_INTERNAL_SECURE_PORT
-
- export KUBE_SIM_NAMESPACE
- export HTTP_PROXY_IMAGE
__kube_create_namespace $KUBE_SIM_NAMESPACE
+ __http_proxy_export_vars
+
# Create service
input_yaml=$SIM_GROUP"/"$HTTP_PROXY_COMPOSE_DIR"/"svc.yaml
output_yaml=$PWD/tmp/proxy_svc.yaml
fi
- echo " Retrieving host and ports for service..."
- HTTP_PROXY_HOST_NAME=$(__kube_get_service_host $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE)
- HTTP_PROXY_WEB_EXTERNAL_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "web")
- HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "webs")
-
- HTTP_PROXY_EXTERNAL_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "http")
- HTTP_PROXY_EXTERNAL_SECURE_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "https")
-
- if [ $HTTP_PROXY_HTTPX == "http" ]; then
- HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_EXTERNAL_PORT
- HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_EXTERNAL_PORT
- HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME"."$KUBE_SIM_NAMESPACE
-
- echo " Host IP, http port: $HTTP_PROXY_HOST_NAME $HTTP_PROXY_WEB_EXTERNAL_PORT"
- else
- HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
- HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_EXTERNAL_SECURE_PORT
- HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME"."$KUBE_SIM_NAMESPACE
-
- echo " Host IP, https port: $HTTP_PROXY_HOST_NAME $HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT"
- fi
-
- __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_PATH$HTTP_PROXY_ALIVE_URL
+ __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_SERVICE_PATH$HTTP_PROXY_ALIVE_URL
else
# Check if docker app shall be fully managed by the test script
exit
fi
- export HTTP_PROXY_APP_NAME
- export HTTP_PROXY_EXTERNAL_PORT
- export HTTP_PROXY_INTERNAL_PORT
- export HTTP_PROXY_EXTERNAL_SECURE_PORT
- export HTTP_PROXY_INTERNAL_SECURE_PORT
- export HTTP_PROXY_WEB_EXTERNAL_PORT
- export HTTP_PROXY_WEB_INTERNAL_PORT
- export HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
- export HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
- export DOCKER_SIM_NWNAME
-
- export HTTP_PROXY_DISPLAY_NAME
+ __http_proxy_export_vars
__start_container $HTTP_PROXY_COMPOSE_DIR "" NODOCKERARGS 1 $HTTP_PROXY_APP_NAME
- if [ $HTTP_PROXY_HTTPX == "http" ]; then
- HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_INTERNAL_PORT
- else
- HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
- fi
- __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_PATH$HTTP_PROXY_ALIVE_URL
-
- if [ $HTTP_PROXY_HTTPX == "http" ]; then
- HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_INTERNAL_PORT
- else
- HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_INTERNAL_SECURE_PORT
- fi
- HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME
-
+ __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_SERVICE_PATH$HTTP_PROXY_ALIVE_URL
fi
echo ""
}
-
export ICS_CONFIG_FILE
export DOCKER_SIM_NWNAME
export ICS_DISPLAY_NAME
-
+ export ICS_LOGPATH
export ICS_DATA_PV_NAME=$ICS_APP_NAME"-pv"
export ICS_DATA_PVC_NAME=$ICS_APP_NAME"-pvc"
exit
fi
-
if [ $retcode_p -eq 0 ]; then
echo -e " Using existing $ICS_APP_NAME deployment and service"
echo " Setting ICS replicas=1"
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# This is a script that contains container/service managemnt functions for Kafka producer/consumer
+
+################ Test engine functions ################
+
+# Create the image var used during the test
+# arg: <image-tag-suffix> (selects staging, snapshot, release etc)
+# <image-tag-suffix> is present only for images with staging, snapshot,release tags
+__KAFKAPC_imagesetup() {
+ __check_and_create_image_var KAFKAPC "KAFKAPC_IMAGE" "KAFKAPC_IMAGE_BASE" "KAFKAPC_IMAGE_TAG" LOCAL "$KAFKAPC_DISPLAY_NAME"
+}
+
+# Pull image from remote repo or use locally built image
+# arg: <pull-policy-override> <pull-policy-original>
+# <pull-policy-override> Shall be used for images allowing overriding. For example use a local image when test is started to use released images
+# <pull-policy-original> Shall be used for images that does not allow overriding
+# Both var may contain: 'remote', 'remote-remove' or 'local'
+__KAFKAPC_imagepull() {
+ __check_and_pull_image $2 "$KAFKAPC_DISPLAY_NAME" $KAFKAPC_APP_NAME KAFKAPC_IMAGE
+}
+
+# Build image (only for simulator or interfaces stubs owned by the test environment)
+# arg: <image-tag-suffix> (selects staging, snapshot, release etc)
+# <image-tag-suffix> is present only for images with staging, snapshot,release tags
+__KAFKAPC_imagebuild() {
+
+ cd ../$KAFKAPC_BUILD_DIR
+ echo " Building KAFKAPC - $KAFKAPC_DISPLAY_NAME - image: $KAFKAPC_IMAGE"
+ docker build --build-arg NEXUS_PROXY_REPO=$NEXUS_PROXY_REPO -t $KAFKAPC_IMAGE . &> .dockererr
+ if [ $? -eq 0 ]; then
+ echo -e $GREEN" Build Ok"$EGREEN
+ __retag_and_push_image KAFKAPC_IMAGE
+ if [ $? -ne 0 ]; then
+ exit 1
+ fi
+ else
+ echo -e $RED" Build Failed"$ERED
+ ((RES_CONF_FAIL++))
+ cat .dockererr
+ echo -e $RED"Exiting...."$ERED
+ exit 1
+ fi
+}
+
+# Generate a string for each included image using the app display name and a docker images format string
+# If a custom image repo is used then also the source image from the local repo is listed
+# arg: <docker-images-format-string> <file-to-append>
+__KAFKAPC_image_data() {
+ echo -e "$KAFKAPC_DISPLAY_NAME\t$(docker images --format $1 $KAFKAPC_IMAGE)" >> $2
+ if [ ! -z "$KAFKAPC_IMAGE_SOURCE" ]; then
+ echo -e "-- source image --\t$(docker images --format $1 $KAFKAPC_IMAGE_SOURCE)" >> $2
+ fi
+}
+
+# Scale kubernetes resources to zero
+# All resources shall be ordered to be scaled to 0, if relevant. If not relevant to scale, then do no action.
+# This function is called for apps fully managed by the test script
+__KAFKAPC_kube_scale_zero() {
+ __kube_scale_all_resources $KUBE_SIM_NAMESPACE autotest KAFKAPC
+}
+
+# Scale kubernetes resources to zero and wait until this has been accomplished, if relevant. If not relevant to scale, then do no action.
+# This function is called for prestarted apps not managed by the test script.
+__KAFKAPC_kube_scale_zero_and_wait() {
+ echo -e $RED" KAFKAPC app is not scaled in this state"$ERED
+}
+
+# Delete all kube resouces for the app
+# This function is called for apps managed by the test script.
+__KAFKAPC_kube_delete_all() {
+ __kube_delete_all_resources $KUBE_SIM_NAMESPACE autotest KAFKAPC
+}
+
+# Store docker logs
+# This function is called for apps managed by the test script.
+# args: <log-dir> <file-prexix>
+__KAFKAPC_store_docker_logs() {
+ if [ $RUNMODE == "KUBE" ]; then
+ kubectl logs -l "autotest=KAFKAPC" -n $KUBE_SIM_NAMESPACE --tail=-1 > $1$2_kafkapc.log 2>&1
+ else
+ docker logs $KAFKAPC_APP_NAME > $1$2_kafkapc.log 2>&1
+ fi
+}
+
+# Initial setup of protocol, host and ports
+# This function is called for apps managed by the test script.
+# args: -
+__KAFKAPC_initial_setup() {
+ use_kafkapc_http
+}
+
+# Set app short-name, app name and namespace for logging runtime statistics of kubernets pods or docker containers
+# For docker, the namespace shall be excluded
+# This function is called for apps managed by the test script as well as for prestarted apps.
+# args: -
+__KAFKAPC_statisics_setup() {
+ if [ $RUNMODE == "KUBE" ]; then
+ echo "KAFKAPC $KAFKAPC_APP_NAME $KUBE_SIM_NAMESPACE"
+ else
+ echo "KAFKAPC $KAFKAPC_APP_NAME"
+ fi
+}
+
+#######################################################
+
+#######################################################
+
+# Set http as the protocol to use for all communication to the Kafka procon
+# args: -
+# (Function for test scripts)
+use_kafkapc_http() {
+ __kafkapc_set_protocoll "http" $KAFKAPC_INTERNAL_PORT $KAFKAPC_EXTERNAL_PORT
+}
+
+# Set httpS as the protocol to use for all communication to the Kafka procon
+# args: -
+# (Function for test scripts)
+use_kafkapc_https() {
+ __kafkapc_set_protocoll "https" $KAFKAPC_INTERNAL_SECURE_PORT $KAFKAPC_EXTERNAL_SECURE_PORT
+}
+
+# Setup paths to svc/container for internal and external access
+# args: <protocol> <internal-port> <external-port>
+__kafkapc_set_protocoll() {
+ echo -e $BOLD"$KAFKAPC_DISPLAY_NAME protocol setting"$EBOLD
+ echo -e " Using $BOLD $1 $EBOLD towards $KAFKAPC_DISPLAY_NAME"
+
+ ## Access to Kafka procon
+
+ KAFKAPC_SERVICE_PATH=$1"://"$KAFKAPC_APP_NAME":"$2 # docker access, container->container and script->container via proxy
+ if [ $RUNMODE == "KUBE" ]; then
+ KAFKAPC_SERVICE_PATH=$1"://"$KAFKAPC_APP_NAME.$KUBE_SIM_NAMESPACE":"$3 # kube access, pod->svc and script->svc via proxy
+ fi
+
+ KAFKAPC_ADAPTER_TYPE="REST"
+ KAFKAPC_ADAPTER=$KAFKAPC_SERVICE_PATH
+
+ echo ""
+}
+
+### Admin API functions Kafka procon
+
+###########################
+### Kafka Procon functions
+###########################
+
+# Export env vars for config files, docker compose and kube resources
+# args:
+__kafkapc_export_vars() {
+ export KAFKAPC_APP_NAME
+ export KAFKAPC_DISPLAY_NAME
+
+ export DOCKER_SIM_NWNAME
+ export KUBE_SIM_NAMESPACE
+
+ export KAFKAPC_IMAGE
+ export KAFKAPC_INTERNAL_PORT
+ export KAFKAPC_INTERNAL_SECURE_PORT
+ export KAFKAPC_EXTERNAL_PORT
+ export KAFKAPC_EXTERNAL_SECURE_PORT
+
+ export MR_KAFKA_SERVICE_PATH
+}
+
+
+# Start the Kafka procon in the simulator group
+# args: -
+# (Function for test scripts)
+start_kafkapc() {
+
+ echo -e $BOLD"Starting $KAFKAPC_DISPLAY_NAME"$EBOLD
+
+ if [ $RUNMODE == "KUBE" ]; then
+
+ # Check if app shall be fully managed by the test script
+ __check_included_image "KAFKAPC"
+ retcode_i=$?
+
+ # Check if app shall only be used by the testscipt
+ __check_prestarted_image "KAFKAPC"
+ retcode_p=$?
+
+ if [ $retcode_i -ne 0 ] && [ $retcode_p -ne 0 ]; then
+ echo -e $RED"The $ICS_APP_NAME app is not included as managed nor prestarted in this test script"$ERED
+ echo -e $RED"The $ICS_APP_NAME will not be started"$ERED
+ exit
+ fi
+ if [ $retcode_i -eq 0 ] && [ $retcode_p -eq 0 ]; then
+ echo -e $RED"The $ICS_APP_NAME app is included both as managed and prestarted in this test script"$ERED
+ echo -e $RED"The $ICS_APP_NAME will not be started"$ERED
+ exit
+ fi
+
+ if [ $retcode_p -eq 0 ]; then
+ echo -e " Using existing $KAFKAPC_APP_NAME deployment and service"
+ echo " Setting RC replicas=1"
+ __kube_scale deployment $KAFKAPC_APP_NAME $KUBE_SIM_NAMESPACE 1
+ fi
+
+ if [ $retcode_i -eq 0 ]; then
+ echo -e " Creating $KAFKAPC_APP_NAME deployment and service"
+
+ __kube_create_namespace $KUBE_SIM_NAMESPACE
+
+ __kafkapc_export_vars
+
+ # Create service
+ input_yaml=$SIM_GROUP"/"$KAFKAPC_COMPOSE_DIR"/"svc.yaml
+ output_yaml=$PWD/tmp/kafkapc_svc.yaml
+ __kube_create_instance service $KAFKAPC_APP_NAME $input_yaml $output_yaml
+
+ # Create app
+ input_yaml=$SIM_GROUP"/"$KAFKAPC_COMPOSE_DIR"/"app.yaml
+ output_yaml=$PWD/tmp/kafkapc_app.yaml
+ __kube_create_instance app $KAFKAPC_APP_NAME $input_yaml $output_yaml
+ fi
+
+ __check_service_start $KAFKAPC_APP_NAME $KAFKAPC_SERVICE_PATH$KAFKAPC_ALIVE_URL
+
+ else
+
+ # Check if docker app shall be fully managed by the test script
+ __check_included_image 'KAFKAPC'
+ if [ $? -eq 1 ]; then
+ echo -e $RED"The Kafka procon app is not included as managed in this test script"$ERED
+ echo -e $RED"The Kafka procon will not be started"$ERED
+ exit
+ fi
+
+ __kafkapc_export_vars
+
+ __start_container $KAFKAPC_COMPOSE_DIR "" NODOCKERARGS 1 $KAFKAPC_APP_NAME
+
+ __check_service_start $KAFKAPC_APP_NAME $KAFKAPC_SERVICE_PATH$KAFKAPC_ALIVE_URL
+ fi
+ echo ""
+ return 0
+}
+
+# Tests if a variable value in the KAFPAPC is equal to a target value and and optional timeout.
+# Arg: <variable-name> <target-value> - This test set pass or fail depending on if the variable is
+# equal to the target or not.
+# Arg: <variable-name> <target-value> <timeout-in-sec> - This test waits up to the timeout seconds
+# before setting pass or fail depending on if the variable value becomes equal to the target
+# value or not.
+# (Function for test scripts)
+kafkapc_equal() {
+ if [ $# -eq 2 ] || [ $# -eq 3 ]; then
+ __var_test KAFPAPC "$KAFKAPC_SERVICE_PATH/" $1 "=" $2 $3
+ else
+ __print_err "Wrong args to kafkapc_equal, needs two or three args: <sim-param> <target-value> [ timeout ]" $@
+ fi
+}
+
+# KAFKA PC API: Reset all, POST /reset
+# Arg: <response-code>
+# (Function for test scripts)
+kafkapc_api_reset() {
+ __log_conf_start $@
+
+ if [ $# -ne 1 ]; then
+ __print_err "<response-code>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC POST /reset)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_conf_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_conf_ok
+ return 0
+}
+
+# KAFKA PC API: Create a topic of a data-type, PUT /topics/<topic>
+# Arg: <response-code> <topic-name> <mime-type>
+# (Function for test scripts)
+kafkapc_api_create_topic() {
+ __log_conf_start $@
+
+ if [ $# -ne 3 ]; then
+ __print_err "<response-code> <topic-name> <mime-type>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC PUT /topics/$2?type=$3)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_conf_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_conf_ok
+ return 0
+}
+
+# KAFKA PC API: Get topics, GET /topics
+# args: <response-code> [ EMPTY | [<topic>]+ ]
+# (Function for test scripts)
+kafkapc_api_get_topics() {
+ __log_test_start $@
+
+ if [ $# -lt 1 ]; then
+ __print_err "<response-code> EMPTY | [<policy-type-id>]*" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC GET /topics)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+ if [ $# -gt 1 ]; then
+ body=${res:0:${#res}-3}
+ targetJson="["
+
+ for pid in ${@:2} ; do
+ if [ "$targetJson" != "[" ]; then
+ targetJson=$targetJson","
+ fi
+ if [ $pid != "EMPTY" ]; then
+ targetJson=$targetJson"\"$pid\""
+ fi
+ done
+ targetJson=$targetJson"]"
+ echo " TARGET JSON: $targetJson" >> $HTTPLOG
+ res=$(python3 ../common/compare_json.py "$targetJson" "$body")
+
+ if [ $res -ne 0 ]; then
+ __log_test_fail_body
+ return 1
+ fi
+ fi
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Get a topic, GET /topic/<topic>
+# args: <response-code> <topic> <mime-type>
+# (Function for test scripts)
+kafkapc_api_get_topic() {
+ __log_test_start $@
+
+ if [ $# -ne 3 ]; then
+ __print_err "<response-code> <topic> <mime-type>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC GET /topics/$2)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ body=${res:0:${#res}-3}
+ if [ "$body" != $3 ]; then
+ __log_test_fail_body
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Start sending on a topic, POST /topic/<topic>/startsend
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_start_sending() {
+ __log_test_start $@
+
+ if [ $# -ne 2 ]; then
+ __print_err "<response-code> <topic>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC POST /topics/$2/startsend)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Start receiving on a topic, POST /topic/<topic>/startreceive
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_start_receiving() {
+ __log_test_start $@
+
+ if [ $# -ne 2 ]; then
+ __print_err "<response-code> <topic>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC POST /topics/$2/startreceive)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Stop sending on a topic, POST /topic/<topic>/stopsend
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_stop_sending() {
+ __log_test_start $@
+
+ if [ $# -ne 2 ]; then
+ __print_err "<response-code> <topic>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC POST /topics/$2/stopsend)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Stop receiving on a topic, POST /topic/<topic>/stopreceive
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_stop_receiving() {
+ __log_test_start $@
+
+ if [ $# -ne 2 ]; then
+ __print_err "<response-code> <topic>" $@
+ return 1
+ fi
+
+ res="$(__do_curl_to_api KAFKAPC POST /topics/$2/stopreceive)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Send a message on a topic, POST /topic/<topic>/msg
+# args: <response-code> <topic> <mime-type> <msg>
+# (Function for test scripts)
+kafkapc_api_post_msg() {
+ __log_test_start $@
+
+ if [ $# -ne 4 ]; then
+ __print_err "<response-code> <topic> <mime-type> <msg>" $@
+ return 1
+ fi
+ payload="tmp/.kafkapayload"
+ echo -n $4 > $payload #-n prevent a newline to be added...
+ res="$(__do_curl_to_api KAFKAPC POST /topics/$2/msg $payload $3)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+
+# KAFKA PC API: Get a msg on a topic, GET /topic/<topic>/msg
+# args: <response-code> <topic> ([ <mime-type> <msg> ] | NOMSG )
+# (Function for test scripts)
+kafkapc_api_get_msg() {
+ __log_test_start $@
+
+ if [ $# -lt 3 ]; then
+ __print_err "<response-code> <topic> ([ <mime-type> <msg> ] | NOMSG )" $@
+ return 1
+ fi
+ mime_type="text/plain"
+ if [ ! -z "$3" ]; then
+ mime_type=$3
+ fi
+ res="$(__do_curl_to_api KAFKAPC GET /topics/$2/msg)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+ if [ $# -eq 4 ]; then
+ body=${res:0:${#res}-3}
+ if [ "$body" != "$4" ]; then
+ __log_test_fail_body
+ return 1
+ fi
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Send a message from a file on a topic, POST /topic/<topic>/msg
+# args: <response-code> <topic> <mime-type> <file>
+# (Function for test scripts)
+kafkapc_api_post_msg_from_file() {
+ __log_test_start $@
+
+ if [ $# -ne 4 ]; then
+ __print_err "<response-code> <topic> <mime-type> <file>" $@
+ return 1
+ fi
+ res="$(__do_curl_to_api KAFKAPC POST /topics/$2/msg $4 $3)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+# KAFKA PC API: Get a msg on a topic and compare with file, GET /topic/<topic>/msg
+# args: <response-code> <topic> <mime-type> <file>
+# (Function for test scripts)
+kafkapc_api_get_msg_from_file() {
+ __log_test_start $@
+
+ if [ $# -ne 4 ]; then
+ __print_err "<response-code> <topic> <mime-type> <file> " $@
+ return 1
+ fi
+
+ if [ -f $4 ]; then
+ msgfile=$(cat $4)
+ else
+ __log_test_fail_general "Message file "$4", does not exist"
+ return 1
+ fi
+
+ mime_type="text/plain"
+
+ res="$(__do_curl_to_api KAFKAPC GET /topics/$2/msg)"
+ status=${res:${#res}-3}
+
+ if [ $status -ne $1 ]; then
+ __log_test_fail_status_code $1 $status
+ return 1
+ fi
+
+ body=${res:0:${#res}-3}
+ if [ "$body" != "$msgfile" ]; then
+ __log_test_fail_body
+ return 1
+ fi
+
+ __log_test_pass
+ return 0
+}
+
+
+# Create json file for payload
+# arg: <size-in-kb> <filename>
+kafkapc_api_generate_json_payload_file() {
+ __log_conf_start $@
+ if [ $# -ne 2 ]; then
+ __print_err "<topic-url> <json-file>" $@
+ return 1
+ fi
+ if [ $1 -lt 1 ] || [ $1 -gt 10000 ]; then
+ __log_conf_fail_general "Only size between 1k and 10000k supported"
+ return 1
+ fi
+ echo -n "{\"abcdefghijklmno\":[" > $2
+ LEN=$(($1*100-2))
+ echo -n "\""ABCDEFG"\"" >> $2
+ for ((idx=1; idx<$LEN; idx++))
+ do
+ echo -n ",\"ABCDEFG\"" >> $2
+ done
+ echo -n "]}" >> $2
+
+ __log_conf_ok
+ return 0
+}
+
+# Create text file for payload
+# arg: <size-in-kb> <filename>
+kafkapc_api_generate_text_payload_file() {
+ __log_conf_start $@
+ if [ $# -ne 2 ]; then
+ __print_err "<topic-url> <text-file>" $@
+ return 1
+ fi
+ if [ $1 -lt 1 ] || [ $1 -gt 10000 ]; then
+ __log_conf_fail_general "Only size between 1k and 10000k supported"
+ return 1
+ fi
+ echo -n "" > $2
+ LEN=$(($1*100))
+ for ((idx=0; idx<$LEN; idx++))
+ do
+ echo -n "ABCDEFGHIJ" >> $2
+ done
+
+ __log_conf_ok
+ return 0
+}
\ No newline at end of file
kubectl logs -n $KUBE_ONAP_NAMESPACE $podname --tail=-1 > $1$2_$podname.log 2>&1
done
else
- docker logs $MR_DMAAP_APP_NAME > $1$2mr.log 2>&1
+ docker logs $MR_DMAAP_APP_NAME > $1$2_mr.log 2>&1
docker logs $MR_KAFKA_APP_NAME > $1$2_mr_kafka.log 2>&1
docker logs $MR_ZOOKEEPER_APP_NAME > $1$2_mr_zookeeper.log 2>&1
fi
fi
fi
- echo " Current topics:"
- curlString="$MR_DMAAP_PATH/topics"
- result=$(__do_curl "$curlString")
- echo $result | indent2
+ dmaap_api_print_topics
fi
__mr_export_vars
return 1
}
+# Helper function to list the current topics in DMAAP MR
+# args: -
+dmaap_api_print_topics() {
+ echo " Current topics:"
+ curlString="$MR_DMAAP_PATH/topics"
+ result=$(__do_curl "$curlString")
+ echo $result | indent2
+}
+
### Generic test cases for varaible checking
return 0
}
-# Create tet file for payload
+# Create text file for payload
# arg: <size-in-kb> <filename>
mr_api_generate_text_payload_file() {
__log_conf_start $@
## Access to SDNC
SDNC_SERVICE_PATH=$1"://"$SDNC_APP_NAME":"$2 # docker access, container->container and script->container via proxy
- SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME":"$1$SDNC_API_URL
+ SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME":"$2$SDNC_API_URL
if [ $RUNMODE == "KUBE" ]; then
SDNC_SERVICE_PATH=$1"://"$SDNC_APP_NAME.$KUBE_SDNC_NAMESPACE":"$3 # kube access, pod->svc and script->svc via proxy
- SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME.KUBE_SDNC_NAMESPACE":"$1$SDNC_API_URL
+ SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME.$KUBE_SDNC_NAMESPACE":"$3$SDNC_API_URL
fi
echo ""
controller_api_get_A1_policy_ids() {
__log_test_start $@
- ric_id=$3
- if [ $RUNMODE == "KUBE" ]; then
- ric_id=$(get_kube_sim_host $3)
- fi
+ ric_id=$(__find_sim_host $3)
paramError=1
if [ $# -gt 3 ] && [ $2 == "OSC" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies"
+ url="$ric_id/a1-p/policytypes/$4/policies"
paramError=0
elif [ $# -gt 2 ] && [ $2 == "STD" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies"
+ url="$ric_id/A1-P/v1/policies"
paramError=0
fi
controller_api_get_A1_policy_type() {
__log_test_start $@
- ric_id=$3
- if [ $RUNMODE == "KUBE" ]; then
- ric_id=$(get_kube_sim_host $3)
- fi
+ ric_id=$(__find_sim_host $3)
paramError=1
if [ $# -gt 3 ] && [ $2 == "OSC" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4"
+ url="$ric_id/a1-p/policytypes/$4"
paramError=0
fi
controller_api_delete_A1_policy() {
__log_test_start $@
- ric_id=$3
- if [ $RUNMODE == "KUBE" ]; then
- ric_id=$(get_kube_sim_host $3)
- fi
+ ric_id=$(__find_sim_host $3)
paramError=1
if [ $# -eq 5 ] && [ $2 == "OSC" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies/$UUID$5"
+ url="$ric_id/a1-p/policytypes/$4/policies/$UUID$5"
paramError=0
elif [ $# -eq 4 ] && [ $2 == "STD" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies/$UUID$4"
+ url="$ric_id/A1-P/v1/policies/$UUID$4"
paramError=0
fi
controller_api_put_A1_policy() {
__log_test_start $@
- ric_id=$3
- if [ $RUNMODE == "KUBE" ]; then
- ric_id=$(get_kube_sim_host $3)
- fi
+ ric_id=$(__find_sim_host $3)
paramError=1
if [ $# -eq 6 ] && [ $2 == "OSC" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies/$UUID$5"
+ url="$ric_id/a1-p/policytypes/$4/policies/$UUID$5"
body=$(sed 's/XXX/'${5}'/g' $6)
paramError=0
elif [ $# -eq 5 ] && [ $2 == "STD" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies/$UUID$4"
+ url="$ric_id/A1-P/v1/policies/$UUID$4"
body=$(sed 's/XXX/'${4}'/g' $5)
paramError=0
fi
controller_api_get_A1_policy_status() {
__log_test_start $@
- ric_id=$3
- if [ $RUNMODE == "KUBE" ]; then
- ric_id=$(get_kube_sim_host $3)
- fi
+ ric_id=$(__find_sim_host $3)
targetJson=""
paramError=1
if [ $# -ge 5 ] && [ $2 == "OSC" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies/$UUID$5/status"
+ url="$ric_id/a1-p/policytypes/$4/policies/$UUID$5/status"
if [ $# -gt 5 ]; then
targetJson="{\"instance_status\":\"$6\""
targetJson=$targetJson",\"has_been_deleted\":\"$7\""
fi
paramError=0
elif [ $# -ge 4 ] && [ $2 == "STD" ]; then
- url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies/$UUID$4/status"
+ url="$ric_id/A1-P/v1/policies/$UUID$4/status"
if [ $# -gt 4 ]; then
targetJson="{\"enforceStatus\":\"$5\""
if [ $# -eq 6 ]; then
KUBE_PROXY_IMAGE_TAG_LOCAL="latest"
#No remote image for kube proxy, local image always used
-#Kube proxy remote image and tag
+#PVC Cleaner remote image and tag
PVC_CLEANER_IMAGE_BASE="ubuntu"
PVC_CLEANER_IMAGE_TAG_REMOTE_PROXY="20.10"
#No local image for pvc cleaner, remote image always used
+#Kafka Procon image and tag
+KAFKAPC_IMAGE_BASE="kafka-procon"
+KAFKAPC_IMAGE_TAG_LOCAL="latest"
+#No local image for pvc cleaner, remote image always used
+
# List of app short names produced by the project
PROJECT_IMAGES_APP_NAMES="PA ICS CP RC RICSIM NGW DMAAPADP DMAAPMED" # Add SDNC here if oran image is used
DMAAP_MED_DATA_MOUNT_PATH="/configs" # Path in container for data file
DMAAP_MED_DATA_FILE="type_config.json" # Container data file name
+KAFKAPC_APP_NAME="kafka-procon" # Name for the Kafka procon
+KAFKAPC_DISPLAY_NAME="Kafaka Producer/Consumer"
+KAFKAPC_EXTERNAL_PORT=8096 # Kafka procon container external port (host -> container)
+KAFKAPC_INTERNAL_PORT=8090 # Kafka procon container internal port (container -> container)
+KAFKAPC_EXTERNAL_SECURE_PORT=8097 # Kafka procon container external secure port (host -> container)
+KAFKAPC_INTERNAL_SECURE_PORT=8091 # Kafka procon container internal secure port (container -> container)
+KAFKAPC_ALIVE_URL="/" # Base path for alive check
+KAFKAPC_COMPOSE_DIR="kafka-procon" # Dir in simulator_group for docker-compose
+KAFKAPC_BUILD_DIR="kafka-procon" # Build dir
########################################
# Setting for common curl-base function
########################################
for iapp in $INCLUDED_IMAGES; do
file_pointer=$(echo $iapp | tr '[:upper:]' '[:lower:]')
file_pointer="../common/"$file_pointer"_api_functions.sh"
- echo " Auto-adding included app $iapp. Sourcing $file_pointer"
+ padded_iapp=$iapp
+ while [ ${#padded_iapp} -lt 16 ]; do
+ padded_iapp=$padded_iapp" "
+ done
+ echo " Auto-adding included app $padded_iapp Sourcing $file_pointer"
. $file_pointer
if [ ! -f "$file_pointer" ]; then
echo " Include file $file_pointer for app $iapp does not exist"
#
# List of short names for all supported apps, including simulators etc
-APP_SHORT_NAMES="PA ICS SDNC CP NGW RC RICSIM HTTPPROXY CBS CONSUL DMAAPMR MR CR PRODSTUB KUBEPROXY DMAAPMED DMAAPADP PVCCLEANER"
+APP_SHORT_NAMES="PA ICS SDNC CP NGW RC RICSIM HTTPPROXY CBS CONSUL DMAAPMR MR CR PRODSTUB KUBEPROXY DMAAPMED DMAAPADP PVCCLEANER KAFKAPC"
# List of available apps that built and released of the project
PROJECT_IMAGES="PA ICS SDNC CP NGW RICSIM RC DMAAPMED DMAAPADP"
AVAILABLE_IMAGES_OVERRIDE="PA ICS SDNC CP NGW RICSIM RC DMAAPMED DMAAPADP"
# List of available apps where the image is built by the test environment
-LOCAL_IMAGE_BUILD="MR CR PRODSTUB KUBEPROXY HTTPPROXY"
+LOCAL_IMAGE_BUILD="MR CR PRODSTUB KUBEPROXY HTTPPROXY KAFKAPC"
# List of system app used only by the test env - kubernetes
TESTENV_KUBE_SYSTEM_APPS="PVCCLEANER"
--- /dev/null
+.tmp.json
+.dockererr
+.env
+.payload
--- /dev/null
+#==================================================================================
+# Copyright (C) 2021: Nordix Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+
+ARG NEXUS_PROXY_REPO
+
+##
+## Build
+##
+
+FROM ${NEXUS_PROXY_REPO}golang:1.17-bullseye AS build
+WORKDIR /app
+COPY go.mod .
+COPY go.sum .
+RUN go mod download
+COPY main.go .
+RUN go build -o /kafkaprocon
+
+##
+## Deploy
+##
+
+FROM gcr.io/distroless/base-debian11
+WORKDIR /
+## Copy from "build" stage
+COPY --from=build /kafkaprocon .
+USER nonroot:nonroot
+ENTRYPOINT ["/kafkaprocon"]
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Automated test script for Kafka procon container
+
+# NOTE: Need a running instance of kafka
+
+
+export PORT=8096
+export HTTPX="http"
+export REQ_CONTENT=""
+export RESP_CONTENT="text/plain"
+
+# source function to do curl and check result
+. ../common/do_curl_function.sh
+
+echo "Requires a running kafka"
+
+payload=".payload"
+
+echo "=== hello world ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== reset ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl POST /reset 200
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[]"
+do_curl GET /topics 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic 404
+
+echo "=== get topic counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/sent 404
+
+echo "=== get topic counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/received 404
+
+echo "=== create a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl PUT /topics/test-topic 405
+
+echo "=== start to send on a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startsend 404
+
+echo "=== start to receive from a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startreceive 404
+
+echo "=== send a msg on a topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/msg 404 $payload
+
+echo "=== receive a msg from a topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/msg 404 $payload
+
+echo "=== stop to send on a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopsend 404
+
+echo "=== stop to receive from a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopreceive 404
+
+# Create 4 topics
+
+echo "=== create topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic1?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\"]"
+do_curl GET /topics 200
+
+echo "=== create topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic2?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\"]"
+do_curl GET /topics 200
+
+echo "=== create topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic3?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\"]"
+do_curl GET /topics 200
+
+echo "=== create topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic4?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\",\"topic4\"]"
+do_curl GET /topics 200
+
+echo "=== get topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic1 200
+
+echo "=== get topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic2 200
+
+echo "=== get topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic3 200
+
+echo "=== get topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic4 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400 $payload
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== send a msg on topic2 ==="
+echo "TEST22" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 400 $payload
+
+echo "=== receive a msg from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+
+echo "=== send a msg on topic3 ==="
+echo "{\"test\":\"33\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 400 $payload
+
+echo "=== receive a msg from topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic3/msg 400
+
+echo "=== send a msg on topic4 ==="
+echo "{\"test\":\"44\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 400 $payload
+
+echo "=== receive a msg from topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/received 200
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/received 200
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/received 200
+
+# Begins send and receive
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400 $payload
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200 $payload
+
+echo "sleep 2 to allow sending the msg to kafka"
+sleep 2
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "sleep 60 to allow kafka to process the msg, unclear why first message takes a long time..."
+sleep 60
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/received 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST11"
+do_curl GET /topics/topic1/msg 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== set topic2 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startsend 200
+
+echo "=== set topic3 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startsend 200
+
+echo "=== set topic4 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startsend 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "=== set topic2 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startreceive 200
+
+echo "=== set topic3 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startreceive 200
+
+echo "=== set topic4 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startreceive 200
+
+
+# Send and receive on all topics
+
+echo "=== send a msg on topic1 ==="
+echo "TEST101" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200 $payload
+
+echo "=== send two msg on topic2 ==="
+echo "TEST201" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 200 $payload
+echo "TEST202" > $payload
+do_curl POST /topics/topic2/msg 200 $payload
+
+echo "=== send three msg on topic3 ==="
+echo "{\"a\":\"msg301\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 200 $payload
+echo "{\"a\":\"msg302\"}" > $payload
+do_curl POST /topics/topic3/msg 200 $payload
+echo "{\"a\":\"msg303\"}" > $payload
+do_curl POST /topics/topic3/msg 200 $payload
+
+
+echo "=== send four msg on topic4 ==="
+echo "{\"a\":\"msg401\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 200 $payload
+echo "{\"a\":\"msg402\"}" > $payload
+do_curl POST /topics/topic4/msg 200 $payload
+echo "{\"a\":\"msg403\"}" > $payload
+do_curl POST /topics/topic4/msg 200 $payload
+echo "{\"a\":\"msg404\"}" > $payload
+do_curl POST /topics/topic4/msg 200 $payload
+
+echo "sleep 10 to allow kafka to process msg"
+sleep 10
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/received 200
+
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/received 200
+
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/received 200
+
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/received 200
+
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/received 200
+
+
+echo "=== get a msg on topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST101"
+do_curl GET /topics/topic1/msg 200
+
+
+echo "=== attempt to receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+echo "=== get a two msg on topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST201"
+do_curl GET /topics/topic2/msg 200
+RESULT="TEST202"
+do_curl GET /topics/topic2/msg 200
+
+
+echo "=== attempt to receive a msg from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic2/msg 204
+
+echo "=== get three msg on topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg301\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg302\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg303\"}"
+do_curl GET /topics/topic3/msg 200
+
+echo "=== attempt to receive a msg from topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic3/msg 204
+
+echo "=== send four msg on topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg401\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg402\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg403\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg404\"}"
+do_curl GET /topics/topic4/msg 200
+
+echo "=== attempt to receive a msg from topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic4/msg 204
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
+
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+echo "This script requires running kafka instance in a docker private network"
+
+# Script to build and start the container
+if [ $# -ne 2 ]; then
+ echo "usage: ./build-and-start.sh <docker-network> <kafka-boostrapserver-host>:<kafka-boostrapserver-port>"
+ echo "example: ./build-and-start.sh nonrtric-docker-net message-router-kafka:9092"
+ exit 1
+fi
+IMAGE="kafka-procon:latest"
+#Build the image
+docker build -t $IMAGE .
+
+if [ $? -ne 0 ]; then
+ echo "Build failed, exiting..."
+ exit 1
+fi
+
+echo "Starting kafka-procon"
+#Run the container in interactive mode o port 8090.
+docker run --rm -it -p "8090:8090" --network $1 -e KAFKA_BOOTSTRAP_SERVER=$2 --name kafka-procon $IMAGE
+
--- /dev/null
+module kafkaprocon
+
+go 1.17
+
+require (
+ github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect
+ github.com/enriquebris/goconcurrentqueue v0.6.0 // indirect
+ github.com/gorilla/mux v1.8.0 // indirect
+)
--- /dev/null
+github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
+github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
+github.com/enriquebris/goconcurrentqueue v0.6.0 h1:DJ97cgoPVoqlC4tTGBokn/omaB3o16yIs5QdAm6YEjc=
+github.com/enriquebris/goconcurrentqueue v0.6.0/go.mod h1:wGJhQNFI4wLNHleZLo5ehk1puj8M6OIl0tOjs3kwJus=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
--- /dev/null
+// Writing a basic HTTP server is easy using the
+// `net/http` package.
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/enriquebris/goconcurrentqueue"
+ "github.com/gorilla/mux"
+)
+
+// Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
+// globalCounters var holding the "global counters"
+// recieved number of received messages from all topics (int)
+// sent number of sent messages to all topics (int)
+// topics var holding all topic related info
+// <topic-name> name of a topic (present after topic is created)
+// content-type data type of the topic (string)
+// counters
+// recieved number of received messages from the topic (int)
+// sent number of sent messages to the topic (int)
+// messages
+// send messages waiting to be sent (set when sending is started) (fifo)
+// received received messages waiting to be fetched (set when reception is started) (fifo)
+
+type counter struct {
+ c uint64
+}
+
+func (c *counter) step() {
+ atomic.AddUint64(&c.c, 1)
+}
+
+func (c counter) get() uint64 {
+ return atomic.LoadUint64(&c.c)
+}
+
+type counters struct {
+ received counter
+ sent counter
+}
+
+func newCounters() counters {
+ return counters{
+ received: counter{},
+ sent: counter{},
+ }
+}
+
+type messages struct {
+ send *goconcurrentqueue.FIFO
+ received *goconcurrentqueue.FIFO
+}
+
+func (m *messages) startSend() bool {
+ if m.send == nil {
+ m.send = goconcurrentqueue.NewFIFO()
+ return true
+ }
+ return false
+}
+
+func (m *messages) stopSend() {
+ m.send = nil
+}
+
+func (m *messages) addToSend(msg string) error {
+ if m.send == nil {
+ return fmt.Errorf("sending not started")
+ }
+ m.send.Lock()
+ defer m.send.Unlock()
+ return m.send.Enqueue(msg)
+}
+
+func (m *messages) getToSend() (interface{}, error) {
+ if m.send == nil {
+ return "", fmt.Errorf("sending not started")
+ }
+ m.send.Lock()
+ defer m.send.Unlock()
+ return m.send.Dequeue()
+}
+
+func (m *messages) startReceive() bool {
+ if m.received == nil {
+ m.received = goconcurrentqueue.NewFIFO()
+ return true
+ }
+ return false
+}
+
+func (m *messages) stopReceive() {
+ m.send = nil
+}
+
+type topic struct {
+ contentType string
+ counters counters
+ messages messages
+}
+
+func newTopic(ct string) *topic {
+ return &topic{
+ contentType: ct,
+ counters: counters{},
+ messages: messages{},
+ }
+}
+
+var globalCounters counters
+var topics map[string]*topic = make(map[string]*topic)
+
+var bootstrapserver = ""
+
+func initApp() {
+ bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
+ if len(bootstrapserver) == 0 {
+ fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
+ fmt.Println("Exiting... ")
+ os.Exit(1)
+ }
+ fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
+}
+
+//Helper function to get a created topic, if it exists
+func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
+ topicId := mux.Vars(req)["topic"]
+ t, exist := topics[topicId]
+ if exist == false {
+ w.WriteHeader(http.StatusNotFound)
+ fmt.Fprintf(w, "Topic %v does not exist", topicId)
+ return nil, "", false
+ }
+ return t, topicId, true
+}
+
+// Alive check
+// GET on /
+func healthCheck(w http.ResponseWriter, req *http.Request) {
+ fmt.Fprintf(w, "OK")
+}
+
+// Deep reset of this interface stub - no removal of msgs or topics in kafka
+// POST on /reset
+func allreset(w http.ResponseWriter, req *http.Request) {
+ for _, v := range topics {
+ v.messages.stopSend()
+ v.messages.stopReceive()
+ }
+ time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
+ globalCounters = newCounters()
+ topics = make(map[string]*topic)
+ fmt.Fprintf(w, "OK")
+}
+
+// Get topics, return json array of strings of topics created by this interface stub
+// Returns json array of strings, array is empty if no topics exist
+// GET on /topics
+
+func getTopics(w http.ResponseWriter, req *http.Request) {
+ topicKeys := make([]string, 0, len(topics))
+ fmt.Printf("len topics: %v\n", len(topics))
+ for k := range topics {
+ topicKeys = append(topicKeys, k)
+ }
+ var j, err = json.Marshal(topicKeys)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
+ return
+ } else {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(j)
+ }
+}
+
+func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
+ w.WriteHeader(httpStatus)
+ w.Header().Set("Content-Type", "text/plain")
+ fmt.Fprintf(w, msg)
+}
+
+// Get a counter value
+// GET /topics/counters/{counter}
+func getCounter(w http.ResponseWriter, req *http.Request) {
+ ctr := mux.Vars(req)["counter"]
+ var ctrvalue = -1
+ if ctr == "received" {
+ ctrvalue = int(globalCounters.received.get())
+ } else if ctr == "sent" {
+ ctrvalue = int(globalCounters.sent.get())
+ }
+
+ if ctrvalue == -1 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Counter %v does not exist", ctr)
+ return
+ }
+ writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
+ return
+
+}
+
+// Create a topic
+// PUT on /topics/<topic>?type=<type> type shall be 'application/json' or 'text/plain'
+func createTopic(w http.ResponseWriter, req *http.Request) {
+ topicId := mux.Vars(req)["topic"]
+ topicType := mux.Vars(req)["type"]
+
+ fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
+
+ if len(topicType) == 0 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Type not specified")
+ return
+ }
+
+ tid, exist := topics[topicId]
+ if exist == true {
+ if tid.contentType != topicType {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
+ return
+ }
+ writeOkRepsonse(w, http.StatusOK, "Topic exist")
+ return
+ }
+
+ t := newTopic(topicType)
+
+ a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+ defer func() { a.Close() }()
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
+ return
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ maxDur := 10 * time.Second
+
+ _, err = a.CreateTopics(
+ ctx,
+ []kafka.TopicSpecification{{
+ Topic: topicId,
+ NumPartitions: 1,
+ ReplicationFactor: 1}},
+ kafka.SetAdminOperationTimeout(maxDur))
+
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
+ return
+ }
+ topics[topicId] = t
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintf(w, "Topic created")
+}
+
+// Get topic type
+// GET on /topic/<topic>
+func getTopic(w http.ResponseWriter, req *http.Request) {
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, t.contentType)
+}
+
+// Get a topics counter value
+// GET /topics/{topic}/counters/{counter}
+func getTopicCounter(w http.ResponseWriter, req *http.Request) {
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ ctr := mux.Vars(req)["counter"]
+
+ var ctrvalue = -1
+ if ctr == "received" {
+ ctrvalue = int(t.counters.received.get())
+ } else if ctr == "sent" {
+ ctrvalue = int(t.counters.sent.get())
+ }
+
+ if ctrvalue == -1 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Counter %v does not exist", ctr)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, strconv.Itoa(ctrvalue))
+ return
+}
+
+func startToSend(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+
+ if !t.messages.startSend() {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Already started sending")
+ return
+ }
+ go func() {
+ p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+ if err != nil {
+ fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
+ return
+ }
+ defer func() { p.Close() }()
+ for {
+ q := t.messages.send
+ if q == nil {
+ return
+ }
+ m, err := q.Get(0)
+ if err == nil {
+ err = p.Produce(&kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
+ Value: []byte(fmt.Sprintf("%v", m)),
+ }, nil)
+ if err == nil {
+ q.Remove(0)
+ t.counters.sent.step()
+ globalCounters.sent.step()
+ msg := fmt.Sprintf("%v", m)
+ if len(msg) < 500 {
+ fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+ } else {
+ fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+ }
+ } else {
+ fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+ q.Remove(0)
+ }
+ } else {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+ }()
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Sending started")
+}
+
+func startToReceive(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+
+ if !t.messages.startReceive() {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Already started receiving")
+ return
+ }
+
+ go func() {
+
+ defer func() { t.messages.stopReceive() }()
+
+ groudId := "kafkaprocon"
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": groudId,
+ "auto.offset.reset": "earliest",
+ "enable.auto.commit": true,
+ "auto.commit.interval.ms": 5000,
+ })
+ if err != nil {
+ fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
+ t.messages.stopReceive()
+ return
+ }
+ c.Commit()
+ defer func() { c.Close() }()
+ for {
+ que := t.messages.received
+ if que == nil {
+ fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
+ return
+ }
+ fmt.Printf("Start subscribing on topic: %v\n", topicId)
+ err = c.SubscribeTopics([]string{topicId}, nil)
+ if err != nil {
+ fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
+ return
+ }
+ maxDur := 1 * time.Second
+ for {
+ msg, err := c.ReadMessage(maxDur)
+ if err == nil {
+ if len(msg.Value) < 500 {
+ fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+ } else {
+ fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+ }
+ err = t.messages.received.Enqueue(string(msg.Value))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
+ return
+ }
+ t.counters.received.step()
+ globalCounters.received.step()
+ } else {
+ fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
+ }
+ }
+ }
+ }()
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Receiving started")
+}
+
+// Post a message to a topic
+// POST /send content type is specified in content type
+func sendToTopic(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ q := t.messages.send
+ if q == nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
+ return
+ }
+ ct := req.Header.Get("Content-Type")
+ if ct != t.contentType {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
+ return
+ }
+
+ if ct == "application/json" {
+ // decoder := json.NewDecoder(req.Body)
+ // var j :=
+ // err := decoder.Decode(&j)
+ // if err != nil {
+ // w.WriteHeader(http.StatusBadRequest)
+ // w.Header().Set("Content-Type", "text/plain")
+ // fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
+ // return
+ // }
+ //m = mux.Vars(req)[""]
+ if err := req.ParseForm(); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
+ return
+ }
+ b, err := ioutil.ReadAll(req.Body)
+ if err == nil {
+ if len(b) < 500 {
+ fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+ } else {
+ fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+ }
+ }
+ err = q.Enqueue(string(b))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Json message to send cannot be put in queue")
+ return
+ }
+ } else if ct == "text/plain" {
+ if err := req.ParseForm(); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
+ return
+ }
+ b, err := ioutil.ReadAll(req.Body)
+ if err == nil {
+ if len(b) < 500 {
+ fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+ } else {
+ fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+ }
+ }
+ err = q.Enqueue(string(b))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Text message to send cannot be put in queue")
+ return
+ }
+ } else {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/plain")
+ fmt.Fprintf(w, "Message to send put in queue")
+}
+
+// Get zero or one message from a topic
+// GET /receive
+func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ if t.messages.received == nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
+ return
+ }
+
+ m, err := t.messages.received.Dequeue()
+ if err != nil {
+ w.WriteHeader(http.StatusNoContent)
+ return
+ }
+
+ w.Header().Set("Content-Type", t.contentType)
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "%v", m)
+}
+
+// Remove the send queue to stop sending
+func stopToSend(w http.ResponseWriter, req *http.Request) {
+ fmt.Printf("Stop sending\n")
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ t.messages.stopSend()
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// Remove the receive queue to stop receiving
+func stopToReceive(w http.ResponseWriter, req *http.Request) {
+ fmt.Printf("Stop receiving\n")
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ t.messages.stopReceive()
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func HelloServer(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
+}
+
+func main() {
+
+ initApp()
+
+ r := mux.NewRouter()
+
+ r.HandleFunc("/", healthCheck).Methods("GET")
+ r.HandleFunc("/reset", allreset).Methods("POST")
+ r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
+ r.HandleFunc("/topics", getTopics).Methods("GET")
+ r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
+ r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
+ r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
+ r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
+ r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
+ r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
+ r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
+ r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
+ r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
+
+ port := "8090"
+ srv := &http.Server{
+ Handler: r,
+ Addr: ":" + port,
+ WriteTimeout: 15 * time.Second,
+ ReadTimeout: 15 * time.Second,
+ }
+ fmt.Println("Running on port: " + port)
+ fmt.Printf(srv.ListenAndServe().Error())
+}
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+echo "This script requires golang to be installed and a running kafka instance on (or availble to) localhost"
+
+# Script to build and start app locally
+if [ $# -ne 1 ]; then
+ echo "usage: ./start-local.sh <kafka-boostrapserver-port>"
+ echo "example: ./start-local.sh 30098"
+ exit 1
+fi
+
+export KAFKA_BOOTSTRAP_SERVER=localhost:$1
+
+echo "Starting kafka-procon on local machine"
+go run main.go
- name: https
containerPort: $ICS_INTERNAL_SECURE_PORT
volumeMounts:
- - mountPath: $ICS_CONFIG_MOUNT_PATH
+ - mountPath: $ICS_CONFIG_MOUNT_PATH/$ICS_CONFIG_FILE
+ subPath: $ICS_CONFIG_FILE
name: ics-conf-name
- volumeMounts:
- mountPath: $ICS_CONTAINER_MNT_DIR
name: ics-data-name
volumes:
################################################################################
-# Copyright (c) 2020 Nordix Foundation. #
+# Copyright (c) 2021 Nordix Foundation. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
################################################################################
-
spring:
profiles:
active: prod
allow-bean-definition-overriding: true
aop:
auto: false
+springdoc:
+ show-actuator: true
management:
endpoints:
web:
exposure:
+ # Enabling of springboot actuator features. See springboot documentation.
include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
logging:
+ # Configuration of logging
level:
ROOT: ERROR
org.springframework: ERROR
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
- org.oransc.information: INFO
+ org.oransc.ics: INFO
file:
- name: /var/log/information-coordinator-service/application.log
+ name: $ICS_LOGPATH
server:
- port : 8434
- http-port: 8083
+ # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
+ # See springboot documentation.
+ port : $ICS_INTERNAL_SECURE_PORT
+ http-port: $ICS_INTERNAL_PORT
ssl:
key-store-type: JKS
key-store-password: policy_agent
key-password: policy_agent
key-alias: policy_agent
app:
- filepath: /opt/app/information-coordinator-service/data/application_configuration.json
webclient:
+ # Configuration of the trust store used for the HTTP client (outgoing requests)
+ # The file location and the password for the truststore is only relevant if trust-store-used == true
+ # Note that the same keystore as for the server is used.
trust-store-used: false
trust-store-password: policy_agent
trust-store: /opt/app/information-coordinator-service/etc/cert/truststore.jks
+ # Configuration of usage of HTTP Proxy for the southbound accesses.
+ # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
http.proxy-host: $ICS_HTTP_PROXY_CONFIG_HOST_NAME
http.proxy-port: $ICS_HTTP_PROXY_CONFIG_PORT
- vardata-directory: /var/information-coordinator-service
\ No newline at end of file
+ vardata-directory: $ICS_CONTAINER_MNT_DIR
--- /dev/null
+.tmp.json
+.dockererr
+gen_docker-compose*
\ No newline at end of file
--- /dev/null
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+ name: $KAFKAPC_APP_NAME
+ namespace: $KUBE_SIM_NAMESPACE
+ labels:
+ run: $KAFKAPC_APP_NAME
+ autotest: CR
+spec:
+ replicas: 1
+ serviceName: $KAFKAPC_APP_NAME
+ selector:
+ matchLabels:
+ run: $KAFKAPC_APP_NAME
+ template:
+ metadata:
+ labels:
+ run: $KAFKAPC_APP_NAME
+ autotest: CR
+ spec:
+ containers:
+ - name: $KAFKAPC_APP_NAME
+ image: $KAFKAPC_IMAGE
+ imagePullPolicy: $KUBE_IMAGE_PULL_POLICY
+ ports:
+ - name: http
+ containerPort: $KAFKAPC_INTERNAL_PORT
+ - name: https
+ containerPort: $KAFKAPC_INTERNAL_SECURE_PORT
+ env:
+ - name: KAFKA_BOOTSTRAP_SERVER
+ value: $MR_KAFKA_SERVICE_PATH
+
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+version: '3.0'
+networks:
+ default:
+ external: true
+ name: ${DOCKER_SIM_NWNAME}
+services:
+ kafka-procon:
+ networks:
+ - default
+ image: ${KAFKAPC_IMAGE}
+ container_name: ${KAFKAPC_APP_NAME}
+ ports:
+ - ${KAFKAPC_EXTERNAL_PORT}:${KAFKAPC_INTERNAL_PORT}
+ - ${KAFKAPC_EXTERNAL_SECURE_PORT}:${KAFKAPC_INTERNAL_SECURE_PORT}
+ environment:
+ KAFKA_BOOTSTRAP_SERVER: $MR_KAFKA_SERVICE_PATH
+ labels:
+ - "nrttest_app=CR"
+ - "nrttest_dp=${KAFKAPC_DISPLAY_NAME}"
--- /dev/null
+apiVersion: v1
+kind: Service
+metadata:
+ name: $KAFKAPC_APP_NAME
+ namespace: $KUBE_SIM_NAMESPACE
+ labels:
+ run: $KAFKAPC_APP_NAME
+ autotest: KAFKAPC
+spec:
+ type: ClusterIP
+ ports:
+ - port: $KAFKAPC_EXTERNAL_PORT
+ targetPort: $KAFKAPC_INTERNAL_PORT
+ protocol: TCP
+ name: http
+ - port: $KAFKAPC_EXTERNAL_SECURE_PORT
+ targetPort: $KAFKAPC_INTERNAL_SECURE_PORT
+ protocol: TCP
+ name: https
+ selector:
+ run: $KAFKAPC_APP_NAME
\ No newline at end of file
volumeMounts:
- mountPath: $POLICY_AGENT_CONFIG_MOUNT_PATH
name: pa-conf-name
-# volumeMounts:
- mountPath: $POLICY_AGENT_CONTAINER_MNT_DIR
name: pa-pv-data-name
-# volumeMounts:
- mountPath: $POLICY_AGENT_DATA_MOUNT_PATH
name: pa-data-name
volumes: