Added kafka interface stub 71/7171/1
authorBjornMagnussonXA <bjorn.magnusson@est.tech>
Thu, 2 Dec 2021 07:47:41 +0000 (08:47 +0100)
committerBjornMagnussonXA <bjorn.magnusson@est.tech>
Thu, 2 Dec 2021 07:47:58 +0000 (08:47 +0100)
Simplification of http proxy setup

Issue-ID: NONRTRIC-618

Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I51ae1f81ba966f7fa570feac8953d8b14b8b2ab5

27 files changed:
test/auto-test/FTC3000.sh
test/auto-test/startMR.sh
test/common/api_curl.sh
test/common/do_curl_function.sh
test/common/httpproxy_api_functions.sh
test/common/ics_api_functions.sh
test/common/kafkapc_api_functions.sh [new file with mode: 0644]
test/common/mr_api_functions.sh
test/common/sdnc_api_functions.sh
test/common/test_env-oran-e-release.sh
test/common/testcase_common.sh
test/common/testengine_config.sh
test/kafka-procon/.gitignore [new file with mode: 0644]
test/kafka-procon/Dockerfile [new file with mode: 0644]
test/kafka-procon/basic_test.sh [new file with mode: 0755]
test/kafka-procon/build-and-start.sh [new file with mode: 0755]
test/kafka-procon/go.mod [new file with mode: 0644]
test/kafka-procon/go.sum [new file with mode: 0644]
test/kafka-procon/main.go [new file with mode: 0644]
test/kafka-procon/start_local.sh [new file with mode: 0755]
test/simulator-group/ics/app.yaml
test/simulator-group/ics/application.yaml
test/simulator-group/kafka-procon/.gitignore [new file with mode: 0644]
test/simulator-group/kafka-procon/app.yaml [new file with mode: 0644]
test/simulator-group/kafka-procon/docker-compose.yml [new file with mode: 0644]
test/simulator-group/kafka-procon/svc.yaml [new file with mode: 0644]
test/simulator-group/policy_agent/app.yaml

index 04a749b..5b8544a 100755 (executable)
 TC_ONELINE_DESCR="App test DMAAP Meditor and DMAAP Adapter"
 
 #App names to include in the test when running docker, space separated list
-DOCKER_INCLUDED_IMAGES="ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR"
+DOCKER_INCLUDED_IMAGES="ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR KAFKAPC"
 
 #App names to include in the test when running kubernetes, space separated list
-KUBE_INCLUDED_IMAGES=" ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR"
+KUBE_INCLUDED_IMAGES=" ICS DMAAPMED DMAAPADP KUBEPROXY MR DMAAPMR CR KAFKAPC"
 
 #Prestarted app (not started by script) to include in the test when running kubernetes, space separated list
 KUBE_PRESTARTED_IMAGES=""
@@ -73,8 +73,13 @@ start_ics NOPROXY $SIM_GROUP/$ICS_COMPOSE_DIR/$ICS_CONFIG_FILE
 set_ics_trace
 
 start_mr    "unauthenticated.dmaapmed.json" "/events" "dmaapmediatorproducer/STD_Fault_Messages" \
-            "unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs" \
-            "unauthenticated.dmaapadp_kafka.text" "/events" "dmaapadapterproducer/msgs"
+            "unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs"
+
+start_kafkapc
+
+kafkapc_api_create_topic 201 "unauthenticated.dmaapadp_kafka.text" "text/plain"
+
+kafkapc_api_start_sending 200 "unauthenticated.dmaapadp_kafka.text"
 
 start_dmaapadp NOPROXY $SIM_GROUP/$DMAAP_ADP_COMPOSE_DIR/$DMAAP_ADP_CONFIG_FILE $SIM_GROUP/$DMAAP_ADP_COMPOSE_DIR/$DMAAP_ADP_DATA_FILE
 
@@ -138,7 +143,7 @@ EXPECTED_DATA_DELIV=0 #Total delivered msg per CR
 DATA_DELIV_JOBS=0 #Total delivered msg per job per CR
 
 mr_api_generate_json_payload_file 1 ./tmp/data_for_dmaap_test.json
-mr_api_generate_text_payload_file 1 ./tmp/data_for_dmaap_test.txt
+kafkapc_api_generate_text_payload_file 1 ./tmp/data_for_dmaap_test.txt
 
 ## Send json file via message-router to adapter
 DATA_DELIV_JOBS=5 #Each job will eventuall get 2 msgs
@@ -192,35 +197,40 @@ done
 ## Send text file via message-router to adapter kafka
 
 EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 1 30
 for ((i=0; i<$NUM_CR; i++))
 do
     cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
 done
 
 EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 2 30
 for ((i=0; i<$NUM_CR; i++))
 do
     cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
 done
 
 EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 3 30
 for ((i=0; i<$NUM_CR; i++))
 do
     cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
 done
 
 EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 4 30
 for ((i=0; i<$NUM_CR; i++))
 do
     cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
 done
 
 EXPECTED_DATA_DELIV=$(($NUM_JOBS/$NUM_CR+$EXPECTED_DATA_DELIV))
-mr_api_send_text_file "/events/unauthenticated.dmaapadp_kafka.text" ./tmp/data_for_dmaap_test.txt
+kafkapc_api_post_msg_from_file 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" ./tmp/data_for_dmaap_test.txt
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 5 30
 for ((i=0; i<$NUM_CR; i++))
 do
     cr_equal $i received_callbacks $EXPECTED_DATA_DELIV 60
@@ -302,8 +312,9 @@ done
 print_timer
 
 # Send small text via message-routere to adapter
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------1'
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------3'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------1'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------3'
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 7 30
 
 # Wait for data recetption, adapter kafka
 EXPECTED_DATA_DELIV=$(($NUM_JOBS*2/$NUM_CR+$EXPECTED_DATA_DELIV))
@@ -370,8 +381,9 @@ print_timer
 
 
 # Send small text via message-router to adapter kafka
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------5'
-mr_api_send_text "/events/unauthenticated.dmaapadp_kafka.text" 'Message-------7'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------5'
+kafkapc_api_post_msg 200 "unauthenticated.dmaapadp_kafka.text" "text/plain" 'Message-------7'
+kafkapc_equal topics/unauthenticated.dmaapadp_kafka.text/counters/sent 9 30
 
 # Wait for data recetption, adapter kafka
 EXPECTED_DATA_DELIV=$(($NUM_JOBS*2/$NUM_CR+$EXPECTED_DATA_DELIV))
index d2d0e9a..2ae6781 100755 (executable)
 #  ============LICENSE_END=================================================
 #
 
-
 TC_ONELINE_DESCR="Starts DMAAP MR"
 
 #App names to include in the test when running docker, space separated list
-DOCKER_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY"
+DOCKER_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY KAFKAPC"
 
 #App names to include in the test when running kubernetes, space separated list
-KUBE_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY"
+KUBE_INCLUDED_IMAGES="MR DMAAPMR KUBEPROXY KAFKAPC"
 #Prestarted app (not started by script) to include in the test when running kubernetes, space separated list
 KUBE_PRESTARTED_IMAGES=""
 
@@ -34,7 +33,7 @@ KUBE_PRESTARTED_IMAGES=""
 CONDITIONALLY_IGNORED_IMAGES=""
 
 #Supported test environment profiles
-SUPPORTED_PROFILES="ONAP-HONOLULU ONAP-ISTANBUL ORAN-D-RELEASE ORAN-E-RELEASE"
+SUPPORTED_PROFILES="ORAN-E-RELEASE"
 #Supported run modes
 SUPPORTED_RUNMODES="DOCKER KUBE"
 
@@ -47,9 +46,20 @@ setup_testenvironment
 clean_environment
 start_kube_proxy
 start_mr    "$MR_READ_TOPIC"  "/events" "users/policy-agent" \
-            "$MR_WRITE_TOPIC" "/events" "users/mr-stub" \
-            "unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs" \
-            "unauthenticated.dmaapmed.json" "/events" "maapmediatorproducer/STD_Fault_Messages"
+            "$MR_WRITE_TOPIC" "/events" "users/mr-stub"
+            #\
+            #"unauthenticated.dmaapadp.json" "/events" "dmaapadapterproducer/msgs" \
+            #"unauthenticated.dmaapmed.json" "/events" "maapmediatorproducer/STD_Fault_Messages"
+
+start_kafkapc
+
+kafkapc_api_reset 200
+
+kafkapc_api_create_topic 201 "unauthenticated.dmaapadp.json" "application/json"
+
+kafkapc_api_create_topic 201 "unauthenticated.dmaapmed.json" "application/json"
+
+dmaap_api_print_topics
 
 if [ $RUNMODE == "KUBE" ]; then
     :
index fcf3d1e..85794f8 100644 (file)
@@ -81,6 +81,10 @@ __do_curl_to_api() {
                        __ADAPTER=$MR_DMAAP_ADAPTER_HTTP
                        __ADAPTER_TYPE=$MR_DMAAP_ADAPTER_TYPE
             __RETRY_CODES=""
+        elif [ $1 == "KAFKAPC" ]; then
+                       __ADAPTER=$KAFKAPC_ADAPTER
+                       __ADAPTER_TYPE=$KAFKAPC_ADAPTER_TYPE
+            __RETRY_CODES=""
                else
             paramError=1
         fi
@@ -139,7 +143,6 @@ __do_curl_to_api() {
                        if [ $# -ne 3 ]; then
                                paramError=1
                        fi
-                       #if [ $__ADAPTER == $__RESTBASE ] || [ $__ADAPTER == $__RESTBASE_SECURE ]; then
                        if [ $__ADAPTER_TYPE == "REST" ]; then
                                paramError=1
                        fi
@@ -151,13 +154,12 @@ __do_curl_to_api() {
     if [ $paramError -eq 1 ]; then
                ((RES_CONF_FAIL++))
         echo "-Incorrect number of parameters to __do_curl_to_api " $@ >> $HTTPLOG
-        echo "-Expected: (PA|ICS GET|PUT|POST|DELETE|GET_BATCH|PUT_BATCH|POST_BATCH|DELETE_BATCH <url> [<file>]) | (PA|ICS RESPONSE <correlation-id>)" >> $HTTPLOG
+        echo "-Expected: (PA|ICS GET|PUT|POST|DELETE|GET_BATCH|PUT_BATCH|POST_BATCH|DELETE_BATCH <url> [<file> [mime-type]]) | (PA|ICS RESPONSE <correlation-id>)" >> $HTTPLOG
         echo "-Returning response 000" >> $HTTPLOG
         echo "-000"
         return 1
     fi
 
-    #if [ $__ADAPTER == $__RESTBASE ] || [ $__ADAPTER == $__RESTBASE_SECURE ]; then
        if [ $__ADAPTER_TYPE == "REST" ]; then
         url=" "${__ADAPTER}${input_url}
         oper=" -X "$oper
index 1cff7a1..a3f5507 100755 (executable)
 
 # Function to execute curl towards a container (or process) and compare + print result
 # Intended use is for basic test scripts where testing is done with curl and the returned response and payload need to be checked.
-# args: GET|PUT|POST|DELETE <url> <target-response-code> [<json-file>]
+# args: GET|PUT|POST|DELETE <url> <target-response-code> [<payload-file>]
 # All calls made to 'localhost:'<port>.
 # Expects env PORT set to intended port number
 # Expects env RESULT to contain the target response body.
 # Optional env HTTPX shall contain protocol 'http' or 'https'. If not set, 'http' is used. For 'https' all cert errors are ignored
-#   RESULT="*" means that returned payload is not checked, may container any text
+#   RESULT="*" means that returned payload is not checked, may contain any text
 #   RESULT="<text>" means that the returned payload has to match the <text> exactly
 #   RESULT="json:<returned-payload>" means that the returned json payload is compared with the expected result (order of json keys and index is irrelevant)
 #   RESULT="json-array-size:<integer-size>" means that the returned json payload shall contain the number of element given by the <integer-size>
@@ -54,14 +54,37 @@ do_curl() {
         PROT=$HTTPX
     fi
 
-    curlstr="curl -X "$1" -skw %{http_code} ${PROT}://localhost:$PORT$2 -H accept:*/*"
+    req_content=""
+    if [ -z "$REQ_CONTENT" ]; then
+        if [ $# -gt 3 ]; then
+            req_content="-H Content-Type:application/json" #Assuming json
+        fi
+    else
+        req_content="-H Content-Type:$REQ_CONTENT"
+    fi
+    resp_content=""
+    if [ -z "$RESP_CONTENT" ]; then
+        if [[ "$RESULT" == "json"* ]]; then
+            resp_content="application/json"
+        elif [[ "$RESULT" == "*" ]]; then
+            resp_content=""
+        else
+            resp_content="text/plain"
+        fi
+    else
+        resp_content=$RESP_CONTENT
+    fi
+    curlstr="curl -X "$1" -skw :%{content_type}:%{http_code} ${PROT}://localhost:$PORT$2 -H accept:*/*"
     if [ $# -gt 3 ]; then
-        curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4
+        curlstr=$curlstr" $req_content --data-binary @"$4
     fi
     echo "  CMD:"$curlstr
     res=$($curlstr)
     status=${res:${#res}-3}
-    body=${res:0:${#res}-3}
+    reminder=${res:0:${#res}-4}
+    content_type="${reminder##*:}"
+    body="${reminder%:*}"
+
     export body
     if [ $status -ne $3 ]; then
         echo "  Error status:"$status" Expected status: "$3
@@ -70,6 +93,14 @@ do_curl() {
         exit 1
     else
         echo "  OK, code: "$status"     (Expected)"
+        if [[ "$content_type" == *"$resp_content"* ]]; then
+            echo "  Content type: "$content_type"     (Expected)"
+        else
+            echo "  Expected content type: "$resp_content
+            echo "  Got: "$content_type
+            echo "Exiting....."
+            exit 1
+        fi
         echo "  Body: "$body
         if [ "$RESULT" == "*" ]; then
             echo "  Body contents not checked"
index 3378a1d..af11f14 100644 (file)
@@ -79,7 +79,7 @@ __HTTPPROXY_kube_scale_zero() {
 # Scale kubernetes resources to zero and wait until this has been accomplished, if relevant. If not relevant to scale, then do no action.
 # This function is called for prestarted apps not managed by the test script.
 __HTTPPROXY_kube_scale_zero_and_wait() {
-       echo -e $RED" HTTPPROXY replicas kept as is"$ERED
+       echo -e $RED" HTTPPROXY app is not scaled in this state"$ERED
 }
 
 # Delete all kube resouces for the app
@@ -103,7 +103,7 @@ __HTTPPROXY_store_docker_logs() {
 # This function is called for apps managed by the test script.
 # args: -
 __HTTPPROXY_initial_setup() {
-       :
+       use_http_proxy_http
 }
 
 # Set app short-name, app name and namespace for logging runtime statistics of kubernets pods or docker containers
@@ -120,42 +120,63 @@ __HTTPPROXY_statisics_setup() {
 
 #######################################################
 
-
-## Access to Http Proxy Receiver
-# Host name may be changed if app started by kube
-# Direct access from script
-HTTP_PROXY_HTTPX="http"
-HTTP_PROXY_HOST_NAME=$LOCALHOST_NAME
-HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_EXTERNAL_PORT
-
-#########################
-### Http Proxy functions
-#########################
-
-# All calls to httpproxy will be directed to the http interface
+# Set http as the protocol to use for all communication to the http proxy
 # args: -
 # (Function for test scripts)
 use_http_proxy_http() {
-       echo -e $BOLD"$HTTP_PROXY_DISPLAY_NAME protocol setting"$EBOLD
-       echo -e " Using $BOLD http $EBOLD"
-       HTTP_PROXY_HTTPX="http"
-       HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_EXTERNAL_PORT
-
-       echo ""
+       __http_proxy_set_protocoll "http" $HTTP_PROXY_INTERNAL_PORT $HTTP_PROXY_EXTERNAL_PORT
 }
 
-# All calls to httpproxy will be directed to the https interface
+# Set https as the protocol to use for all communication to the http proxy
 # args: -
 # (Function for test scripts)
 use_http_proxy_https() {
+       __http_proxy_set_protocoll "https" $HTTP_PROXY_INTERNAL_SECURE_PORT $HTTP_PROXY_EXTERNAL_SECURE_PORT
+}
+
+# Setup paths to svc/container for internal and external access
+# args: <protocol> <internal-port> <external-port>
+__http_proxy_set_protocoll() {
        echo -e $BOLD"$HTTP_PROXY_DISPLAY_NAME protocol setting"$EBOLD
-       echo -e " Using $BOLD https $EBOLD"
-       HTTP_PROXY_HTTPX="https"
-       HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_EXTERNAL_SECURE_PORT
+       echo -e " Using $BOLD $1 $EBOLD towards $HTTP_PROXY_DISPLAY_NAME"
+
+       ## Access to http proxy
+       ## HTTP_PROXY_CONFIG_HOST_NAME and HTTP_PROXY_CONFIG_PORT used by apps as config for proxy host and port
+
+       HTTP_PROXY_SERVICE_PATH=$1"://"$HTTP_PROXY_APP_NAME":"$2  # docker access, container->container and script->container via proxy
+       HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME
+       HTTP_PROXY_CONFIG_PORT=$2
+       if [ $RUNMODE == "KUBE" ]; then
+               HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME"."$KUBE_SIM_NAMESPACE
+               HTTP_PROXY_CONFIG_PORT=$3
+               HTTP_PROXY_SERVICE_PATH=$1"://"$HTTP_PROXY_APP_NAME.$KUBE_SIM_NAMESPACE":"$3 # kube access, pod->svc and script->svc via proxy
+       fi
 
        echo ""
 }
 
+# Export env vars for config files, docker compose and kube resources
+# args:
+__http_proxy_export_vars() {
+
+       export HTTP_PROXY_APP_NAME
+       export HTTP_PROXY_DISPLAY_NAME
+
+       export HTTP_PROXY_WEB_EXTERNAL_PORT
+       export HTTP_PROXY_WEB_INTERNAL_PORT
+       export HTTP_PROXY_EXTERNAL_PORT
+       export HTTP_PROXY_INTERNAL_PORT
+
+       export HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
+       export HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
+       export HTTP_PROXY_EXTERNAL_SECURE_PORT
+       export HTTP_PROXY_INTERNAL_SECURE_PORT
+
+       export KUBE_SIM_NAMESPACE
+       export DOCKER_SIM_NWNAME
+       export HTTP_PROXY_IMAGE
+}
+
 # Start the Http Proxy in the simulator group
 # args: -
 # (Function for test scripts)
@@ -193,23 +214,11 @@ start_http_proxy() {
 
                if [ $retcode_i -eq 0 ]; then
                        echo -e " Creating $HTTP_PROXY_APP_NAME deployment and service"
-                       export HTTP_PROXY_APP_NAME
-
-                       export HTTP_PROXY_WEB_EXTERNAL_PORT
-                       export HTTP_PROXY_WEB_INTERNAL_PORT
-                       export HTTP_PROXY_EXTERNAL_PORT
-                       export HTTP_PROXY_INTERNAL_PORT
-
-                       export HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
-                       export HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
-                       export HTTP_PROXY_EXTERNAL_SECURE_PORT
-                       export HTTP_PROXY_INTERNAL_SECURE_PORT
-
-                       export KUBE_SIM_NAMESPACE
-                       export HTTP_PROXY_IMAGE
 
                        __kube_create_namespace $KUBE_SIM_NAMESPACE
 
+                       __http_proxy_export_vars
+
                        # Create service
                        input_yaml=$SIM_GROUP"/"$HTTP_PROXY_COMPOSE_DIR"/"svc.yaml
                        output_yaml=$PWD/tmp/proxy_svc.yaml
@@ -222,29 +231,7 @@ start_http_proxy() {
 
                fi
 
-               echo " Retrieving host and ports for service..."
-               HTTP_PROXY_HOST_NAME=$(__kube_get_service_host $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE)
-               HTTP_PROXY_WEB_EXTERNAL_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "web")
-               HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "webs")
-
-               HTTP_PROXY_EXTERNAL_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "http")
-               HTTP_PROXY_EXTERNAL_SECURE_PORT=$(__kube_get_service_port $HTTP_PROXY_APP_NAME $KUBE_SIM_NAMESPACE "https")
-
-               if [ $HTTP_PROXY_HTTPX == "http" ]; then
-                       HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_EXTERNAL_PORT
-                       HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_EXTERNAL_PORT
-                       HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME"."$KUBE_SIM_NAMESPACE
-
-                       echo " Host IP, http port: $HTTP_PROXY_HOST_NAME $HTTP_PROXY_WEB_EXTERNAL_PORT"
-               else
-                       HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
-                       HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_EXTERNAL_SECURE_PORT
-                       HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME"."$KUBE_SIM_NAMESPACE
-
-                       echo " Host IP, https port: $HTTP_PROXY_HOST_NAME $HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT"
-               fi
-
-               __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_PATH$HTTP_PROXY_ALIVE_URL
+               __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_SERVICE_PATH$HTTP_PROXY_ALIVE_URL
 
        else
                # Check if docker app shall be fully managed by the test script
@@ -255,36 +242,11 @@ start_http_proxy() {
                        exit
                fi
 
-               export HTTP_PROXY_APP_NAME
-               export HTTP_PROXY_EXTERNAL_PORT
-               export HTTP_PROXY_INTERNAL_PORT
-               export HTTP_PROXY_EXTERNAL_SECURE_PORT
-               export HTTP_PROXY_INTERNAL_SECURE_PORT
-               export HTTP_PROXY_WEB_EXTERNAL_PORT
-               export HTTP_PROXY_WEB_INTERNAL_PORT
-               export HTTP_PROXY_WEB_EXTERNAL_SECURE_PORT
-               export HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
-               export DOCKER_SIM_NWNAME
-
-               export HTTP_PROXY_DISPLAY_NAME
+               __http_proxy_export_vars
 
                __start_container $HTTP_PROXY_COMPOSE_DIR "" NODOCKERARGS 1 $HTTP_PROXY_APP_NAME
 
-               if [ $HTTP_PROXY_HTTPX == "http" ]; then
-                       HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_INTERNAL_PORT
-               else
-                       HTTP_PROXY_PATH=$HTTP_PROXY_HTTPX"://"$HTTP_PROXY_HOST_NAME":"$HTTP_PROXY_WEB_INTERNAL_SECURE_PORT
-               fi
-        __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_PATH$HTTP_PROXY_ALIVE_URL
-
-               if [ $HTTP_PROXY_HTTPX == "http" ]; then
-                       HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_INTERNAL_PORT
-               else
-                       HTTP_PROXY_CONFIG_PORT=$HTTP_PROXY_INTERNAL_SECURE_PORT
-               fi
-               HTTP_PROXY_CONFIG_HOST_NAME=$HTTP_PROXY_APP_NAME
-
+        __check_service_start $HTTP_PROXY_APP_NAME $HTTP_PROXY_SERVICE_PATH$HTTP_PROXY_ALIVE_URL
        fi
        echo ""
 }
-
index ea4036d..df2de4f 100644 (file)
@@ -180,7 +180,7 @@ __ics_export_vars() {
                export ICS_CONFIG_FILE
                export DOCKER_SIM_NWNAME
                export ICS_DISPLAY_NAME
-
+               export ICS_LOGPATH
 
                export ICS_DATA_PV_NAME=$ICS_APP_NAME"-pv"
                export ICS_DATA_PVC_NAME=$ICS_APP_NAME"-pvc"
@@ -231,7 +231,6 @@ start_ics() {
                        exit
                fi
 
-
                if [ $retcode_p -eq 0 ]; then
                        echo -e " Using existing $ICS_APP_NAME deployment and service"
                        echo " Setting ICS replicas=1"
diff --git a/test/common/kafkapc_api_functions.sh b/test/common/kafkapc_api_functions.sh
new file mode 100644 (file)
index 0000000..002657c
--- /dev/null
@@ -0,0 +1,648 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# This is a script that contains container/service managemnt functions for Kafka producer/consumer
+
+################ Test engine functions ################
+
+# Create the image var used during the test
+# arg: <image-tag-suffix> (selects staging, snapshot, release etc)
+# <image-tag-suffix> is present only for images with staging, snapshot,release tags
+__KAFKAPC_imagesetup() {
+       __check_and_create_image_var KAFKAPC "KAFKAPC_IMAGE" "KAFKAPC_IMAGE_BASE" "KAFKAPC_IMAGE_TAG" LOCAL "$KAFKAPC_DISPLAY_NAME"
+}
+
+# Pull image from remote repo or use locally built image
+# arg: <pull-policy-override> <pull-policy-original>
+# <pull-policy-override> Shall be used for images allowing overriding. For example use a local image when test is started to use released images
+# <pull-policy-original> Shall be used for images that does not allow overriding
+# Both var may contain: 'remote', 'remote-remove' or 'local'
+__KAFKAPC_imagepull() {
+       __check_and_pull_image $2 "$KAFKAPC_DISPLAY_NAME" $KAFKAPC_APP_NAME KAFKAPC_IMAGE
+}
+
+# Build image (only for simulator or interfaces stubs owned by the test environment)
+# arg: <image-tag-suffix> (selects staging, snapshot, release etc)
+# <image-tag-suffix> is present only for images with staging, snapshot,release tags
+__KAFKAPC_imagebuild() {
+
+       cd ../$KAFKAPC_BUILD_DIR
+       echo " Building KAFKAPC - $KAFKAPC_DISPLAY_NAME - image: $KAFKAPC_IMAGE"
+       docker build  --build-arg NEXUS_PROXY_REPO=$NEXUS_PROXY_REPO -t $KAFKAPC_IMAGE . &> .dockererr
+       if [ $? -eq 0 ]; then
+               echo -e  $GREEN"  Build Ok"$EGREEN
+               __retag_and_push_image KAFKAPC_IMAGE
+               if [ $? -ne 0 ]; then
+                       exit 1
+               fi
+       else
+               echo -e $RED"  Build Failed"$ERED
+               ((RES_CONF_FAIL++))
+               cat .dockererr
+               echo -e $RED"Exiting...."$ERED
+               exit 1
+       fi
+}
+
+# Generate a string for each included image using the app display name and a docker images format string
+# If a custom image repo is used then also the source image from the local repo is listed
+# arg: <docker-images-format-string> <file-to-append>
+__KAFKAPC_image_data() {
+       echo -e "$KAFKAPC_DISPLAY_NAME\t$(docker images --format $1 $KAFKAPC_IMAGE)" >>   $2
+       if [ ! -z "$KAFKAPC_IMAGE_SOURCE" ]; then
+               echo -e "-- source image --\t$(docker images --format $1 $KAFKAPC_IMAGE_SOURCE)" >>   $2
+       fi
+}
+
+# Scale kubernetes resources to zero
+# All resources shall be ordered to be scaled to 0, if relevant. If not relevant to scale, then do no action.
+# This function is called for apps fully managed by the test script
+__KAFKAPC_kube_scale_zero() {
+       __kube_scale_all_resources $KUBE_SIM_NAMESPACE autotest KAFKAPC
+}
+
+# Scale kubernetes resources to zero and wait until this has been accomplished, if relevant. If not relevant to scale, then do no action.
+# This function is called for prestarted apps not managed by the test script.
+__KAFKAPC_kube_scale_zero_and_wait() {
+       echo -e $RED" KAFKAPC app is not scaled in this state"$ERED
+}
+
+# Delete all kube resouces for the app
+# This function is called for apps managed by the test script.
+__KAFKAPC_kube_delete_all() {
+       __kube_delete_all_resources $KUBE_SIM_NAMESPACE autotest KAFKAPC
+}
+
+# Store docker logs
+# This function is called for apps managed by the test script.
+# args: <log-dir> <file-prexix>
+__KAFKAPC_store_docker_logs() {
+       if [ $RUNMODE == "KUBE" ]; then
+               kubectl  logs -l "autotest=KAFKAPC" -n $KUBE_SIM_NAMESPACE --tail=-1 > $1$2_kafkapc.log 2>&1
+       else
+               docker logs $KAFKAPC_APP_NAME > $1$2_kafkapc.log 2>&1
+       fi
+}
+
+# Initial setup of protocol, host and ports
+# This function is called for apps managed by the test script.
+# args: -
+__KAFKAPC_initial_setup() {
+       use_kafkapc_http
+}
+
+# Set app short-name, app name and namespace for logging runtime statistics of kubernets pods or docker containers
+# For docker, the namespace shall be excluded
+# This function is called for apps managed by the test script as well as for prestarted apps.
+# args: -
+__KAFKAPC_statisics_setup() {
+       if [ $RUNMODE == "KUBE" ]; then
+               echo "KAFKAPC $KAFKAPC_APP_NAME $KUBE_SIM_NAMESPACE"
+       else
+               echo "KAFKAPC $KAFKAPC_APP_NAME"
+       fi
+}
+
+#######################################################
+
+#######################################################
+
+# Set http as the protocol to use for all communication to the Kafka procon
+# args: -
+# (Function for test scripts)
+use_kafkapc_http() {
+       __kafkapc_set_protocoll "http" $KAFKAPC_INTERNAL_PORT $KAFKAPC_EXTERNAL_PORT
+}
+
+# Set httpS as the protocol to use for all communication to the Kafka procon
+# args: -
+# (Function for test scripts)
+use_kafkapc_https() {
+       __kafkapc_set_protocoll "https" $KAFKAPC_INTERNAL_SECURE_PORT $KAFKAPC_EXTERNAL_SECURE_PORT
+}
+
+# Setup paths to svc/container for internal and external access
+# args: <protocol> <internal-port> <external-port>
+__kafkapc_set_protocoll() {
+       echo -e $BOLD"$KAFKAPC_DISPLAY_NAME protocol setting"$EBOLD
+       echo -e " Using $BOLD $1 $EBOLD towards $KAFKAPC_DISPLAY_NAME"
+
+       ## Access to Kafka procon
+
+       KAFKAPC_SERVICE_PATH=$1"://"$KAFKAPC_APP_NAME":"$2  # docker access, container->container and script->container via proxy
+       if [ $RUNMODE == "KUBE" ]; then
+               KAFKAPC_SERVICE_PATH=$1"://"$KAFKAPC_APP_NAME.$KUBE_SIM_NAMESPACE":"$3 # kube access, pod->svc and script->svc via proxy
+       fi
+
+       KAFKAPC_ADAPTER_TYPE="REST"
+       KAFKAPC_ADAPTER=$KAFKAPC_SERVICE_PATH
+
+       echo ""
+}
+
+### Admin API functions Kafka procon
+
+###########################
+### Kafka Procon functions
+###########################
+
+# Export env vars for config files, docker compose and kube resources
+# args:
+__kafkapc_export_vars() {
+       export KAFKAPC_APP_NAME
+       export KAFKAPC_DISPLAY_NAME
+
+       export DOCKER_SIM_NWNAME
+       export KUBE_SIM_NAMESPACE
+
+       export KAFKAPC_IMAGE
+       export KAFKAPC_INTERNAL_PORT
+       export KAFKAPC_INTERNAL_SECURE_PORT
+       export KAFKAPC_EXTERNAL_PORT
+       export KAFKAPC_EXTERNAL_SECURE_PORT
+
+       export MR_KAFKA_SERVICE_PATH
+}
+
+
+# Start the Kafka procon in the simulator group
+# args: -
+# (Function for test scripts)
+start_kafkapc() {
+
+       echo -e $BOLD"Starting $KAFKAPC_DISPLAY_NAME"$EBOLD
+
+       if [ $RUNMODE == "KUBE" ]; then
+
+               # Check if app shall be fully managed by the test script
+               __check_included_image "KAFKAPC"
+               retcode_i=$?
+
+               # Check if app shall only be used by the testscipt
+               __check_prestarted_image "KAFKAPC"
+               retcode_p=$?
+
+               if [ $retcode_i -ne 0 ] && [ $retcode_p -ne 0 ]; then
+                       echo -e $RED"The $ICS_APP_NAME app is not included as managed nor prestarted in this test script"$ERED
+                       echo -e $RED"The $ICS_APP_NAME will not be started"$ERED
+                       exit
+               fi
+               if [ $retcode_i -eq 0 ] && [ $retcode_p -eq 0 ]; then
+                       echo -e $RED"The $ICS_APP_NAME app is included both as managed and prestarted in this test script"$ERED
+                       echo -e $RED"The $ICS_APP_NAME will not be started"$ERED
+                       exit
+               fi
+
+               if [ $retcode_p -eq 0 ]; then
+                       echo -e " Using existing $KAFKAPC_APP_NAME deployment and service"
+                       echo " Setting RC replicas=1"
+                       __kube_scale deployment $KAFKAPC_APP_NAME $KUBE_SIM_NAMESPACE 1
+               fi
+
+               if [ $retcode_i -eq 0 ]; then
+                       echo -e " Creating $KAFKAPC_APP_NAME deployment and service"
+
+            __kube_create_namespace $KUBE_SIM_NAMESPACE
+
+                       __kafkapc_export_vars
+
+                       # Create service
+                       input_yaml=$SIM_GROUP"/"$KAFKAPC_COMPOSE_DIR"/"svc.yaml
+                       output_yaml=$PWD/tmp/kafkapc_svc.yaml
+                       __kube_create_instance service $KAFKAPC_APP_NAME $input_yaml $output_yaml
+
+                       # Create app
+                       input_yaml=$SIM_GROUP"/"$KAFKAPC_COMPOSE_DIR"/"app.yaml
+                       output_yaml=$PWD/tmp/kafkapc_app.yaml
+                       __kube_create_instance app $KAFKAPC_APP_NAME $input_yaml $output_yaml
+               fi
+
+               __check_service_start $KAFKAPC_APP_NAME $KAFKAPC_SERVICE_PATH$KAFKAPC_ALIVE_URL
+
+       else
+
+               # Check if docker app shall be fully managed by the test script
+               __check_included_image 'KAFKAPC'
+               if [ $? -eq 1 ]; then
+                       echo -e $RED"The Kafka procon app is not included as managed in this test script"$ERED
+                       echo -e $RED"The Kafka procon will not be started"$ERED
+                       exit
+               fi
+
+               __kafkapc_export_vars
+
+               __start_container $KAFKAPC_COMPOSE_DIR "" NODOCKERARGS 1 $KAFKAPC_APP_NAME
+
+        __check_service_start $KAFKAPC_APP_NAME $KAFKAPC_SERVICE_PATH$KAFKAPC_ALIVE_URL
+       fi
+    echo ""
+    return 0
+}
+
+# Tests if a variable value in the KAFPAPC is equal to a target value and and optional timeout.
+# Arg: <variable-name> <target-value> - This test set pass or fail depending on if the variable is
+# equal to the target or not.
+# Arg: <variable-name> <target-value> <timeout-in-sec>  - This test waits up to the timeout seconds
+# before setting pass or fail depending on if the variable value becomes equal to the target
+# value or not.
+# (Function for test scripts)
+kafkapc_equal() {
+       if [ $# -eq 2 ] || [ $# -eq 3 ]; then
+               __var_test KAFPAPC "$KAFKAPC_SERVICE_PATH/" $1 "=" $2 $3
+       else
+               __print_err "Wrong args to kafkapc_equal, needs two or three args: <sim-param> <target-value> [ timeout ]" $@
+       fi
+}
+
+# KAFKA PC API: Reset all, POST /reset
+# Arg: <response-code>
+# (Function for test scripts)
+kafkapc_api_reset() {
+       __log_conf_start $@
+
+       if [ $# -ne 1 ]; then
+               __print_err "<response-code>" $@
+               return 1
+       fi
+
+       res="$(__do_curl_to_api KAFKAPC POST /reset)"
+       status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_conf_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_conf_ok
+       return 0
+}
+
+# KAFKA PC API: Create a topic of a data-type, PUT /topics/<topic>
+# Arg: <response-code> <topic-name>  <mime-type>
+# (Function for test scripts)
+kafkapc_api_create_topic() {
+       __log_conf_start $@
+
+    if [ $# -ne 3 ]; then
+        __print_err "<response-code> <topic-name>  <mime-type>" $@
+        return 1
+       fi
+
+       res="$(__do_curl_to_api KAFKAPC PUT /topics/$2?type=$3)"
+       status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_conf_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_conf_ok
+       return 0
+}
+
+# KAFKA PC API: Get topics, GET /topics
+# args: <response-code> [ EMPTY | [<topic>]+ ]
+# (Function for test scripts)
+kafkapc_api_get_topics() {
+       __log_test_start $@
+
+    if [ $# -lt 1 ]; then
+               __print_err "<response-code> EMPTY | [<policy-type-id>]*" $@
+               return 1
+       fi
+
+    res="$(__do_curl_to_api KAFKAPC GET /topics)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+       if [ $# -gt 1 ]; then
+               body=${res:0:${#res}-3}
+               targetJson="["
+
+               for pid in ${@:2} ; do
+                       if [ "$targetJson" != "[" ]; then
+                               targetJson=$targetJson","
+                       fi
+                       if [ $pid != "EMPTY" ]; then
+                               targetJson=$targetJson"\"$pid\""
+                       fi
+               done
+               targetJson=$targetJson"]"
+               echo " TARGET JSON: $targetJson" >> $HTTPLOG
+               res=$(python3 ../common/compare_json.py "$targetJson" "$body")
+
+               if [ $res -ne 0 ]; then
+                       __log_test_fail_body
+                       return 1
+               fi
+       fi
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Get a topic, GET /topic/<topic>
+# args: <response-code> <topic> <mime-type>
+# (Function for test scripts)
+kafkapc_api_get_topic() {
+       __log_test_start $@
+
+    if [ $# -ne 3 ]; then
+               __print_err "<response-code> <topic> <mime-type>" $@
+               return 1
+       fi
+
+    res="$(__do_curl_to_api KAFKAPC GET /topics/$2)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       body=${res:0:${#res}-3}
+       if [ "$body" != $3 ]; then
+               __log_test_fail_body
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Start sending on a topic, POST /topic/<topic>/startsend
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_start_sending() {
+       __log_test_start $@
+
+    if [ $# -ne 2 ]; then
+               __print_err "<response-code> <topic>" $@
+               return 1
+       fi
+
+    res="$(__do_curl_to_api KAFKAPC POST /topics/$2/startsend)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Start receiving on a topic, POST /topic/<topic>/startreceive
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_start_receiving() {
+       __log_test_start $@
+
+    if [ $# -ne 2 ]; then
+               __print_err "<response-code> <topic>" $@
+               return 1
+       fi
+
+    res="$(__do_curl_to_api KAFKAPC POST /topics/$2/startreceive)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Stop sending on a topic, POST /topic/<topic>/stopsend
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_stop_sending() {
+       __log_test_start $@
+
+    if [ $# -ne 2 ]; then
+               __print_err "<response-code> <topic>" $@
+               return 1
+       fi
+
+    res="$(__do_curl_to_api KAFKAPC POST /topics/$2/stopsend)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Stop receiving on a topic, POST /topic/<topic>/stopreceive
+# args: <response-code> <topic>
+# (Function for test scripts)
+kafkapc_api_stop_receiving() {
+       __log_test_start $@
+
+    if [ $# -ne 2 ]; then
+               __print_err "<response-code> <topic>" $@
+               return 1
+       fi
+
+    res="$(__do_curl_to_api KAFKAPC POST /topics/$2/stopreceive)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Send a message on a topic, POST /topic/<topic>/msg
+# args: <response-code> <topic> <mime-type> <msg>
+# (Function for test scripts)
+kafkapc_api_post_msg() {
+       __log_test_start $@
+
+    if [ $# -ne 4 ]; then
+               __print_err "<response-code> <topic> <mime-type> <msg>" $@
+               return 1
+       fi
+       payload="tmp/.kafkapayload"
+       echo -n $4 > $payload     #-n prevent a newline to be added...
+    res="$(__do_curl_to_api KAFKAPC POST /topics/$2/msg $payload $3)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+
+# KAFKA PC API: Get a msg on a topic, GET /topic/<topic>/msg
+# args: <response-code> <topic>  ([ <mime-type>  <msg> ] | NOMSG )
+# (Function for test scripts)
+kafkapc_api_get_msg() {
+       __log_test_start $@
+
+    if [ $# -lt 3 ]; then
+               __print_err "<response-code> <topic>  ([ <mime-type>  <msg> ] | NOMSG )" $@
+               return 1
+       fi
+       mime_type="text/plain"
+       if [ ! -z "$3" ]; then
+               mime_type=$3
+       fi
+    res="$(__do_curl_to_api KAFKAPC GET /topics/$2/msg)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+       if [ $# -eq 4 ]; then
+               body=${res:0:${#res}-3}
+               if [ "$body" != "$4" ]; then
+                       __log_test_fail_body
+                       return 1
+               fi
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Send a message from a file on a topic, POST /topic/<topic>/msg
+# args: <response-code> <topic> <mime-type> <file>
+# (Function for test scripts)
+kafkapc_api_post_msg_from_file() {
+       __log_test_start $@
+
+    if [ $# -ne 4 ]; then
+               __print_err "<response-code> <topic> <mime-type> <file>" $@
+               return 1
+       fi
+    res="$(__do_curl_to_api KAFKAPC POST /topics/$2/msg $4 $3)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+# KAFKA PC API: Get a msg on a topic and compare with file, GET /topic/<topic>/msg
+# args: <response-code> <topic>  <mime-type>  <file>
+# (Function for test scripts)
+kafkapc_api_get_msg_from_file() {
+       __log_test_start $@
+
+    if [ $# -ne 4 ]; then
+               __print_err "<response-code> <topic>  <mime-type>  <file> " $@
+               return 1
+       fi
+
+       if [ -f $4 ]; then
+               msgfile=$(cat $4)
+       else
+               __log_test_fail_general "Message file "$4", does not exist"
+               return 1
+       fi
+
+       mime_type="text/plain"
+
+    res="$(__do_curl_to_api KAFKAPC GET /topics/$2/msg)"
+    status=${res:${#res}-3}
+
+       if [ $status -ne $1 ]; then
+               __log_test_fail_status_code $1 $status
+               return 1
+       fi
+
+       body=${res:0:${#res}-3}
+       if [ "$body" != "$msgfile" ]; then
+               __log_test_fail_body
+               return 1
+       fi
+
+       __log_test_pass
+       return 0
+}
+
+
+# Create json file for payload
+# arg: <size-in-kb> <filename>
+kafkapc_api_generate_json_payload_file() {
+       __log_conf_start $@
+    if [ $# -ne 2 ]; then
+        __print_err "<topic-url> <json-file>" $@
+        return 1
+    fi
+       if [ $1 -lt 1 ] || [ $1 -gt 10000 ]; then
+               __log_conf_fail_general "Only size between 1k and 10000k supported"
+               return 1
+       fi
+       echo -n "{\"abcdefghijklmno\":[" > $2
+       LEN=$(($1*100-2))
+       echo -n "\""ABCDEFG"\"" >> $2
+       for ((idx=1; idx<$LEN; idx++))
+       do
+               echo -n ",\"ABCDEFG\"" >> $2
+       done
+       echo -n "]}" >> $2
+
+       __log_conf_ok
+       return 0
+}
+
+# Create text file for payload
+# arg: <size-in-kb> <filename>
+kafkapc_api_generate_text_payload_file() {
+       __log_conf_start $@
+    if [ $# -ne 2 ]; then
+        __print_err "<topic-url> <text-file>" $@
+        return 1
+    fi
+       if [ $1 -lt 1 ] || [ $1 -gt 10000 ]; then
+               __log_conf_fail_general "Only size between 1k and 10000k supported"
+               return 1
+       fi
+       echo -n "" > $2
+       LEN=$(($1*100))
+       for ((idx=0; idx<$LEN; idx++))
+       do
+               echo -n "ABCDEFGHIJ" >> $2
+       done
+
+       __log_conf_ok
+       return 0
+}
\ No newline at end of file
index 55badb0..3e00ec3 100755 (executable)
@@ -173,7 +173,7 @@ __DMAAPMR_store_docker_logs() {
                        kubectl logs -n $KUBE_ONAP_NAMESPACE $podname --tail=-1 > $1$2_$podname.log 2>&1
                done
        else
-               docker logs $MR_DMAAP_APP_NAME > $1$2mr.log 2>&1
+               docker logs $MR_DMAAP_APP_NAME > $1$2_mr.log 2>&1
                docker logs $MR_KAFKA_APP_NAME > $1$2_mr_kafka.log 2>&1
                docker logs $MR_ZOOKEEPER_APP_NAME > $1$2_mr_zookeeper.log 2>&1
        fi
@@ -629,10 +629,7 @@ start_mr() {
                                fi
                        fi
 
-                       echo " Current topics:"
-                       curlString="$MR_DMAAP_PATH/topics"
-                       result=$(__do_curl "$curlString")
-                       echo $result | indent2
+                       dmaap_api_print_topics
                fi
 
                __mr_export_vars
@@ -719,6 +716,15 @@ __dmaap_pipeclean() {
        return 1
 }
 
+# Helper function to list the current topics in DMAAP MR
+# args: -
+dmaap_api_print_topics() {
+       echo " Current topics:"
+       curlString="$MR_DMAAP_PATH/topics"
+       result=$(__do_curl "$curlString")
+       echo $result | indent2
+}
+
 
 ### Generic test cases for varaible checking
 
@@ -901,7 +907,7 @@ mr_api_generate_json_payload_file() {
        return 0
 }
 
-# Create tet file for payload
+# Create text file for payload
 # arg: <size-in-kb> <filename>
 mr_api_generate_text_payload_file() {
        __log_conf_start $@
index d502825..3ac0a6c 100644 (file)
@@ -145,10 +145,10 @@ __sdnc_set_protocoll() {
        ## Access to SDNC
 
        SDNC_SERVICE_PATH=$1"://"$SDNC_APP_NAME":"$2  # docker access, container->container and script->container via proxy
-       SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME":"$1$SDNC_API_URL
+       SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME":"$2$SDNC_API_URL
        if [ $RUNMODE == "KUBE" ]; then
                SDNC_SERVICE_PATH=$1"://"$SDNC_APP_NAME.$KUBE_SDNC_NAMESPACE":"$3 # kube access, pod->svc and script->svc via proxy
-               SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME.KUBE_SDNC_NAMESPACE":"$1$SDNC_API_URL
+               SDNC_SERVICE_API_PATH=$1"://"$SDNC_USER":"$SDNC_PWD"@"$SDNC_APP_NAME.$KUBE_SDNC_NAMESPACE":"$3$SDNC_API_URL
        fi
        echo ""
 
@@ -380,16 +380,13 @@ __do_curl_to_controller() {
 controller_api_get_A1_policy_ids() {
        __log_test_start $@
 
-       ric_id=$3
-       if [ $RUNMODE == "KUBE" ]; then
-               ric_id=$(get_kube_sim_host $3)
-       fi
+       ric_id=$(__find_sim_host $3)
     paramError=1
     if [ $# -gt 3 ] && [ $2 == "OSC" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies"
+        url="$ric_id/a1-p/policytypes/$4/policies"
                paramError=0
     elif [ $# -gt 2 ] && [ $2 == "STD" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies"
+        url="$ric_id/A1-P/v1/policies"
         paramError=0
        fi
 
@@ -446,13 +443,10 @@ controller_api_get_A1_policy_ids() {
 controller_api_get_A1_policy_type() {
        __log_test_start $@
 
-       ric_id=$3
-       if [ $RUNMODE == "KUBE" ]; then
-               ric_id=$(get_kube_sim_host $3)
-       fi
+       ric_id=$(__find_sim_host $3)
     paramError=1
     if [ $# -gt 3 ] && [ $2 == "OSC" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4"
+        url="$ric_id/a1-p/policytypes/$4"
                paramError=0
        fi
 
@@ -500,16 +494,13 @@ controller_api_get_A1_policy_type() {
 controller_api_delete_A1_policy() {
        __log_test_start $@
 
-       ric_id=$3
-       if [ $RUNMODE == "KUBE" ]; then
-               ric_id=$(get_kube_sim_host $3)
-       fi
+       ric_id=$(__find_sim_host $3)
     paramError=1
     if [ $# -eq 5 ] && [ $2 == "OSC" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies/$UUID$5"
+        url="$ric_id/a1-p/policytypes/$4/policies/$UUID$5"
                paramError=0
     elif [ $# -eq 4 ] && [ $2 == "STD" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies/$UUID$4"
+        url="$ric_id/A1-P/v1/policies/$UUID$4"
         paramError=0
        fi
 
@@ -542,18 +533,15 @@ controller_api_delete_A1_policy() {
 controller_api_put_A1_policy() {
        __log_test_start $@
 
-       ric_id=$3
-       if [ $RUNMODE == "KUBE" ]; then
-               ric_id=$(get_kube_sim_host $3)
-       fi
+       ric_id=$(__find_sim_host $3)
     paramError=1
     if [ $# -eq 6 ] && [ $2 == "OSC" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies/$UUID$5"
+        url="$ric_id/a1-p/policytypes/$4/policies/$UUID$5"
         body=$(sed 's/XXX/'${5}'/g' $6)
 
                paramError=0
     elif [ $# -eq 5 ] && [ $2 == "STD" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies/$UUID$4"
+        url="$ric_id/A1-P/v1/policies/$UUID$4"
         body=$(sed 's/XXX/'${4}'/g' $5)
         paramError=0
        fi
@@ -588,14 +576,11 @@ controller_api_put_A1_policy() {
 controller_api_get_A1_policy_status() {
        __log_test_start $@
 
-       ric_id=$3
-       if [ $RUNMODE == "KUBE" ]; then
-               ric_id=$(get_kube_sim_host $3)
-       fi
+       ric_id=$(__find_sim_host $3)
     targetJson=""
     paramError=1
     if [ $# -ge 5 ] && [ $2 == "OSC" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/a1-p/policytypes/$4/policies/$UUID$5/status"
+        url="$ric_id/a1-p/policytypes/$4/policies/$UUID$5/status"
         if [ $# -gt 5 ]; then
             targetJson="{\"instance_status\":\"$6\""
             targetJson=$targetJson",\"has_been_deleted\":\"$7\""
@@ -603,7 +588,7 @@ controller_api_get_A1_policy_status() {
         fi
                paramError=0
     elif [ $# -ge 4 ] && [ $2 == "STD" ]; then
-        url="$RIC_SIM_HTTPX://$ric_id:$RIC_SIM_PORT/A1-P/v1/policies/$UUID$4/status"
+        url="$ric_id/A1-P/v1/policies/$UUID$4/status"
         if [ $# -gt 4 ]; then
             targetJson="{\"enforceStatus\":\"$5\""
             if [ $# -eq 6 ]; then
index 1003ddd..044ad24 100755 (executable)
@@ -197,11 +197,16 @@ KUBE_PROXY_IMAGE_BASE="nodejs-kube-proxy"
 KUBE_PROXY_IMAGE_TAG_LOCAL="latest"
 #No remote image for kube proxy, local image always used
 
-#Kube proxy remote image and tag
+#PVC Cleaner remote image and tag
 PVC_CLEANER_IMAGE_BASE="ubuntu"
 PVC_CLEANER_IMAGE_TAG_REMOTE_PROXY="20.10"
 #No local image for pvc cleaner, remote image always used
 
+#Kafka Procon image and tag
+KAFKAPC_IMAGE_BASE="kafka-procon"
+KAFKAPC_IMAGE_TAG_LOCAL="latest"
+#No local image for pvc cleaner, remote image always used
+
 # List of app short names produced by the project
 PROJECT_IMAGES_APP_NAMES="PA ICS CP RC RICSIM NGW DMAAPADP DMAAPMED"  # Add SDNC here if oran image is used
 
@@ -524,6 +529,15 @@ DMAAP_MED_COMPOSE_DIR="dmaapmed"                         # Dir in simulator_grou
 DMAAP_MED_DATA_MOUNT_PATH="/configs"                     # Path in container for data file
 DMAAP_MED_DATA_FILE="type_config.json"                   # Container data file name
 
+KAFKAPC_APP_NAME="kafka-procon"                          # Name for the Kafka procon
+KAFKAPC_DISPLAY_NAME="Kafaka Producer/Consumer"
+KAFKAPC_EXTERNAL_PORT=8096                               # Kafka procon container external port (host -> container)
+KAFKAPC_INTERNAL_PORT=8090                               # Kafka procon container internal port (container -> container)
+KAFKAPC_EXTERNAL_SECURE_PORT=8097                        # Kafka procon container external secure port (host -> container)
+KAFKAPC_INTERNAL_SECURE_PORT=8091                        # Kafka procon container internal secure port (container -> container)
+KAFKAPC_ALIVE_URL="/"                               # Base path for alive check
+KAFKAPC_COMPOSE_DIR="kafka-procon"                       # Dir in simulator_group for docker-compose
+KAFKAPC_BUILD_DIR="kafka-procon"                         # Build dir
 ########################################
 # Setting for common curl-base function
 ########################################
index 80059a1..d8b359b 100755 (executable)
@@ -779,7 +779,11 @@ echo -e $BOLD"Auto adding included apps"$EBOLD
        for iapp in $INCLUDED_IMAGES; do
                file_pointer=$(echo $iapp | tr '[:upper:]' '[:lower:]')
                file_pointer="../common/"$file_pointer"_api_functions.sh"
-               echo " Auto-adding included app $iapp.  Sourcing $file_pointer"
+               padded_iapp=$iapp
+               while [ ${#padded_iapp} -lt 16 ]; do
+                       padded_iapp=$padded_iapp" "
+               done
+               echo " Auto-adding included app $padded_iapp  Sourcing $file_pointer"
                . $file_pointer
                if [ ! -f "$file_pointer" ]; then
                        echo " Include file $file_pointer for app $iapp does not exist"
index 24a3908..1048d76 100644 (file)
@@ -18,7 +18,7 @@
 #
 
 # List of short names for all supported apps, including simulators etc
-APP_SHORT_NAMES="PA ICS SDNC CP NGW RC RICSIM HTTPPROXY CBS CONSUL DMAAPMR MR CR PRODSTUB KUBEPROXY DMAAPMED DMAAPADP PVCCLEANER"
+APP_SHORT_NAMES="PA ICS SDNC CP NGW RC RICSIM HTTPPROXY CBS CONSUL DMAAPMR MR CR PRODSTUB KUBEPROXY DMAAPMED DMAAPADP PVCCLEANER KAFKAPC"
 
 # List of available apps that built and released of the project
 PROJECT_IMAGES="PA ICS SDNC CP NGW RICSIM RC DMAAPMED DMAAPADP"
@@ -27,7 +27,7 @@ PROJECT_IMAGES="PA ICS SDNC CP NGW RICSIM RC DMAAPMED DMAAPADP"
 AVAILABLE_IMAGES_OVERRIDE="PA ICS SDNC CP NGW RICSIM RC DMAAPMED DMAAPADP"
 
 # List of available apps where the image is built by the test environment
-LOCAL_IMAGE_BUILD="MR CR PRODSTUB KUBEPROXY HTTPPROXY"
+LOCAL_IMAGE_BUILD="MR CR PRODSTUB KUBEPROXY HTTPPROXY KAFKAPC"
 
 # List of system app used only by the test env - kubernetes
 TESTENV_KUBE_SYSTEM_APPS="PVCCLEANER"
diff --git a/test/kafka-procon/.gitignore b/test/kafka-procon/.gitignore
new file mode 100644 (file)
index 0000000..6703e3c
--- /dev/null
@@ -0,0 +1,4 @@
+.tmp.json
+.dockererr
+.env
+.payload
diff --git a/test/kafka-procon/Dockerfile b/test/kafka-procon/Dockerfile
new file mode 100644 (file)
index 0000000..97a09cb
--- /dev/null
@@ -0,0 +1,43 @@
+#==================================================================================
+#   Copyright (C) 2021: Nordix Foundation
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+#   platform project (RICP).
+#==================================================================================
+
+ARG NEXUS_PROXY_REPO
+
+##
+## Build
+##
+
+FROM ${NEXUS_PROXY_REPO}golang:1.17-bullseye AS build
+WORKDIR /app
+COPY go.mod .
+COPY go.sum .
+RUN go mod download
+COPY main.go .
+RUN go build -o /kafkaprocon
+
+##
+## Deploy
+##
+
+FROM gcr.io/distroless/base-debian11
+WORKDIR /
+## Copy from "build" stage
+COPY --from=build /kafkaprocon .
+USER nonroot:nonroot
+ENTRYPOINT ["/kafkaprocon"]
\ No newline at end of file
diff --git a/test/kafka-procon/basic_test.sh b/test/kafka-procon/basic_test.sh
new file mode 100755 (executable)
index 0000000..f3a602e
--- /dev/null
@@ -0,0 +1,632 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Automated test script for Kafka procon container
+
+# NOTE: Need a running instance of kafka
+
+
+export PORT=8096
+export HTTPX="http"
+export REQ_CONTENT=""
+export RESP_CONTENT="text/plain"
+
+# source function to do curl and check result
+. ../common/do_curl_function.sh
+
+echo "Requires a running kafka"
+
+payload=".payload"
+
+echo "=== hello world ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== reset ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl POST /reset 200
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[]"
+do_curl GET /topics 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic 404
+
+echo "=== get topic counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/sent 404
+
+echo "=== get topic counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/received 404
+
+echo "=== create a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl PUT /topics/test-topic 405
+
+echo "=== start to send on a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startsend 404
+
+echo "=== start to receive from a  topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startreceive 404
+
+echo "=== send a msg on a  topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/msg 404 $payload
+
+echo "=== receive a msg  from a  topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/msg 404 $payload
+
+echo "=== stop to send on a  topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopsend 404
+
+echo "=== stop to receive from a  topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopreceive 404
+
+# Create 4 topics
+
+echo "=== create topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic1?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\"]"
+do_curl GET /topics 200
+
+echo "=== create topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic2?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\"]"
+do_curl GET /topics 200
+
+echo "=== create topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic3?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\"]"
+do_curl GET /topics 200
+
+echo "=== create topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic4?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\",\"topic4\"]"
+do_curl GET /topics 200
+
+echo "=== get topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic1 200
+
+echo "=== get topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic2 200
+
+echo "=== get topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic3 200
+
+echo "=== get topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic4 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400  $payload
+
+echo "=== receive a msg  from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== send a msg on topic2 ==="
+echo "TEST22" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 400 $payload
+
+echo "=== receive a msg  from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+
+echo "=== send a msg on topic3 ==="
+echo "{\"test\":\"33\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 400 $payload
+
+echo "=== receive a msg  from topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic3/msg 400
+
+echo "=== send a msg on topic4 ==="
+echo "{\"test\":\"44\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 400 $payload
+
+echo "=== receive a msg  from topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/received 200
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/received 200
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/received 200
+
+# Begins send and receive
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400  $payload
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200  $payload
+
+echo "sleep 2  to allow sending the msg to kafka"
+sleep 2
+
+echo "=== receive a msg  from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "sleep 60 to allow kafka to process the msg, unclear why first message takes a long time..."
+sleep 60
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/received 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST11"
+do_curl GET /topics/topic1/msg 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== set topic2 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startsend 200
+
+echo "=== set topic3 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startsend 200
+
+echo "=== set topic4 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startsend 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "=== set topic2 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startreceive 200
+
+echo "=== set topic3 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startreceive 200
+
+echo "=== set topic4 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startreceive 200
+
+
+# Send and receive on all topics
+
+echo "=== send a msg on topic1 ==="
+echo "TEST101" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200  $payload
+
+echo "=== send two msg on topic2 ==="
+echo "TEST201" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 200  $payload
+echo "TEST202" > $payload
+do_curl POST /topics/topic2/msg 200  $payload
+
+echo "=== send three msg on topic3 ==="
+echo "{\"a\":\"msg301\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 200  $payload
+echo "{\"a\":\"msg302\"}" > $payload
+do_curl POST /topics/topic3/msg 200  $payload
+echo "{\"a\":\"msg303\"}" > $payload
+do_curl POST /topics/topic3/msg 200  $payload
+
+
+echo "=== send four msg on topic4 ==="
+echo "{\"a\":\"msg401\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 200  $payload
+echo "{\"a\":\"msg402\"}" > $payload
+do_curl POST /topics/topic4/msg 200  $payload
+echo "{\"a\":\"msg403\"}" > $payload
+do_curl POST /topics/topic4/msg 200  $payload
+echo "{\"a\":\"msg404\"}" > $payload
+do_curl POST /topics/topic4/msg 200  $payload
+
+echo "sleep 10 to allow kafka to process msg"
+sleep 10
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/received 200
+
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/received 200
+
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/received 200
+
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/received 200
+
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/received 200
+
+
+echo "=== get a msg on topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST101"
+do_curl GET /topics/topic1/msg 200
+
+
+echo "=== attempt to receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+echo "=== get a two msg on topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST201"
+do_curl GET /topics/topic2/msg 200
+RESULT="TEST202"
+do_curl GET /topics/topic2/msg 200
+
+
+echo "=== attempt to receive a msg from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic2/msg 204
+
+echo "=== get three msg on topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg301\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg302\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg303\"}"
+do_curl GET /topics/topic3/msg 200
+
+echo "=== attempt to receive a msg from topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic3/msg 204
+
+echo "=== send four msg on topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg401\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg402\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg403\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg404\"}"
+do_curl GET /topics/topic4/msg 200
+
+echo "=== attempt to receive a msg from topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic4/msg 204
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
+
diff --git a/test/kafka-procon/build-and-start.sh b/test/kafka-procon/build-and-start.sh
new file mode 100755 (executable)
index 0000000..4e4a550
--- /dev/null
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+echo "This script requires running kafka instance in a docker private network"
+
+# Script to build and start the container
+if [ $# -ne 2 ]; then
+    echo "usage: ./build-and-start.sh <docker-network> <kafka-boostrapserver-host>:<kafka-boostrapserver-port>"
+    echo "example: ./build-and-start.sh nonrtric-docker-net message-router-kafka:9092"
+    exit 1
+fi
+IMAGE="kafka-procon:latest"
+#Build the image
+docker build -t $IMAGE .
+
+if [ $? -ne 0 ]; then
+    echo "Build failed, exiting..."
+    exit 1
+fi
+
+echo "Starting kafka-procon"
+#Run the container in interactive mode o port 8090.
+docker run --rm -it -p "8090:8090" --network $1 -e KAFKA_BOOTSTRAP_SERVER=$2 --name kafka-procon $IMAGE
+
diff --git a/test/kafka-procon/go.mod b/test/kafka-procon/go.mod
new file mode 100644 (file)
index 0000000..31ccc7c
--- /dev/null
@@ -0,0 +1,9 @@
+module kafkaprocon
+
+go 1.17
+
+require (
+       github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect
+       github.com/enriquebris/goconcurrentqueue v0.6.0 // indirect
+       github.com/gorilla/mux v1.8.0 // indirect
+)
diff --git a/test/kafka-procon/go.sum b/test/kafka-procon/go.sum
new file mode 100644 (file)
index 0000000..34a6358
--- /dev/null
@@ -0,0 +1,6 @@
+github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
+github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
+github.com/enriquebris/goconcurrentqueue v0.6.0 h1:DJ97cgoPVoqlC4tTGBokn/omaB3o16yIs5QdAm6YEjc=
+github.com/enriquebris/goconcurrentqueue v0.6.0/go.mod h1:wGJhQNFI4wLNHleZLo5ehk1puj8M6OIl0tOjs3kwJus=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
diff --git a/test/kafka-procon/main.go b/test/kafka-procon/main.go
new file mode 100644 (file)
index 0000000..6f8bad2
--- /dev/null
@@ -0,0 +1,595 @@
+// Writing a basic HTTP server is easy using the
+// `net/http` package.
+package main
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "os"
+       "strconv"
+       "sync/atomic"
+       "time"
+
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+       "github.com/enriquebris/goconcurrentqueue"
+       "github.com/gorilla/mux"
+)
+
+// Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
+//    globalCounters      var holding the "global counters"
+//      recieved          number of received messages from all topics                             (int)
+//      sent              number of sent messages to all topics                                   (int)
+//    topics              var holding all topic related info
+//      <topic-name>      name of a topic (present after topic is created)
+//        content-type    data type of the topic                                                  (string)
+//        counters
+//          recieved      number of received messages from the topic                              (int)
+//          sent          number of sent messages to the topic                                    (int)
+//        messages
+//          send          messages waiting to be sent (set when sending is started)               (fifo)
+//          received      received messages waiting to be fetched (set when reception is started) (fifo)
+
+type counter struct {
+       c uint64
+}
+
+func (c *counter) step() {
+       atomic.AddUint64(&c.c, 1)
+}
+
+func (c counter) get() uint64 {
+       return atomic.LoadUint64(&c.c)
+}
+
+type counters struct {
+       received counter
+       sent     counter
+}
+
+func newCounters() counters {
+       return counters{
+               received: counter{},
+               sent:     counter{},
+       }
+}
+
+type messages struct {
+       send     *goconcurrentqueue.FIFO
+       received *goconcurrentqueue.FIFO
+}
+
+func (m *messages) startSend() bool {
+       if m.send == nil {
+               m.send = goconcurrentqueue.NewFIFO()
+               return true
+       }
+       return false
+}
+
+func (m *messages) stopSend() {
+       m.send = nil
+}
+
+func (m *messages) addToSend(msg string) error {
+       if m.send == nil {
+               return fmt.Errorf("sending not started")
+       }
+       m.send.Lock()
+       defer m.send.Unlock()
+       return m.send.Enqueue(msg)
+}
+
+func (m *messages) getToSend() (interface{}, error) {
+       if m.send == nil {
+               return "", fmt.Errorf("sending not started")
+       }
+       m.send.Lock()
+       defer m.send.Unlock()
+       return m.send.Dequeue()
+}
+
+func (m *messages) startReceive() bool {
+       if m.received == nil {
+               m.received = goconcurrentqueue.NewFIFO()
+               return true
+       }
+       return false
+}
+
+func (m *messages) stopReceive() {
+       m.send = nil
+}
+
+type topic struct {
+       contentType string
+       counters    counters
+       messages    messages
+}
+
+func newTopic(ct string) *topic {
+       return &topic{
+               contentType: ct,
+               counters:    counters{},
+               messages:    messages{},
+       }
+}
+
+var globalCounters counters
+var topics map[string]*topic = make(map[string]*topic)
+
+var bootstrapserver = ""
+
+func initApp() {
+       bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
+       if len(bootstrapserver) == 0 {
+               fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
+               fmt.Println("Exiting... ")
+               os.Exit(1)
+       }
+       fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
+}
+
+//Helper function to get a created topic, if it exists
+func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
+       topicId := mux.Vars(req)["topic"]
+       t, exist := topics[topicId]
+       if exist == false {
+               w.WriteHeader(http.StatusNotFound)
+               fmt.Fprintf(w, "Topic %v does not exist", topicId)
+               return nil, "", false
+       }
+       return t, topicId, true
+}
+
+// Alive check
+// GET on /
+func healthCheck(w http.ResponseWriter, req *http.Request) {
+       fmt.Fprintf(w, "OK")
+}
+
+// Deep reset of this interface stub - no removal of msgs or topics in kafka
+// POST on /reset
+func allreset(w http.ResponseWriter, req *http.Request) {
+       for _, v := range topics {
+               v.messages.stopSend()
+               v.messages.stopReceive()
+       }
+       time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
+       globalCounters = newCounters()
+       topics = make(map[string]*topic)
+       fmt.Fprintf(w, "OK")
+}
+
+// Get topics, return json array of strings of topics created by this interface stub
+// Returns json array of strings, array is empty if no topics exist
+// GET on /topics
+
+func getTopics(w http.ResponseWriter, req *http.Request) {
+       topicKeys := make([]string, 0, len(topics))
+       fmt.Printf("len topics: %v\n", len(topics))
+       for k := range topics {
+               topicKeys = append(topicKeys, k)
+       }
+       var j, err = json.Marshal(topicKeys)
+       if err != nil {
+               w.WriteHeader(http.StatusInternalServerError)
+               fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
+               return
+       } else {
+               w.Header().Set("Content-Type", "application/json")
+               w.WriteHeader(http.StatusOK)
+               w.Write(j)
+       }
+}
+
+func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
+       w.WriteHeader(httpStatus)
+       w.Header().Set("Content-Type", "text/plain")
+       fmt.Fprintf(w, msg)
+}
+
+// Get a counter value
+// GET /topics/counters/{counter}
+func getCounter(w http.ResponseWriter, req *http.Request) {
+       ctr := mux.Vars(req)["counter"]
+       var ctrvalue = -1
+       if ctr == "received" {
+               ctrvalue = int(globalCounters.received.get())
+       } else if ctr == "sent" {
+               ctrvalue = int(globalCounters.sent.get())
+       }
+
+       if ctrvalue == -1 {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Counter %v does not exist", ctr)
+               return
+       }
+       writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
+       return
+
+}
+
+// Create a topic
+// PUT on /topics/<topic>?type=<type>    type shall be 'application/json' or 'text/plain'
+func createTopic(w http.ResponseWriter, req *http.Request) {
+       topicId := mux.Vars(req)["topic"]
+       topicType := mux.Vars(req)["type"]
+
+       fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
+
+       if len(topicType) == 0 {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Type not specified")
+               return
+       }
+
+       tid, exist := topics[topicId]
+       if exist == true {
+               if tid.contentType != topicType {
+                       w.WriteHeader(http.StatusBadRequest)
+                       fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
+                       return
+               }
+               writeOkRepsonse(w, http.StatusOK, "Topic exist")
+               return
+       }
+
+       t := newTopic(topicType)
+
+       a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+       defer func() { a.Close() }()
+       if err != nil {
+               w.WriteHeader(http.StatusInternalServerError)
+               fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
+               return
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       maxDur := 10 * time.Second
+
+       _, err = a.CreateTopics(
+               ctx,
+               []kafka.TopicSpecification{{
+                       Topic:             topicId,
+                       NumPartitions:     1,
+                       ReplicationFactor: 1}},
+               kafka.SetAdminOperationTimeout(maxDur))
+
+       if err != nil {
+               w.WriteHeader(http.StatusInternalServerError)
+               fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
+               return
+       }
+       topics[topicId] = t
+       w.WriteHeader(http.StatusCreated)
+       fmt.Fprintf(w, "Topic created")
+}
+
+// Get topic type
+// GET on /topic/<topic>
+func getTopic(w http.ResponseWriter, req *http.Request) {
+       t, _, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintf(w, t.contentType)
+}
+
+// Get a topics counter value
+// GET /topics/{topic}/counters/{counter}
+func getTopicCounter(w http.ResponseWriter, req *http.Request) {
+       t, _, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+       ctr := mux.Vars(req)["counter"]
+
+       var ctrvalue = -1
+       if ctr == "received" {
+               ctrvalue = int(t.counters.received.get())
+       } else if ctr == "sent" {
+               ctrvalue = int(t.counters.sent.get())
+       }
+
+       if ctrvalue == -1 {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Counter %v does not exist", ctr)
+               return
+       }
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintf(w, strconv.Itoa(ctrvalue))
+       return
+}
+
+func startToSend(w http.ResponseWriter, req *http.Request) {
+       t, topicId, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+
+       if !t.messages.startSend() {
+               w.WriteHeader(http.StatusOK)
+               fmt.Fprintf(w, "Already started sending")
+               return
+       }
+       go func() {
+               p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+               if err != nil {
+                       fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
+                       return
+               }
+               defer func() { p.Close() }()
+               for {
+                       q := t.messages.send
+                       if q == nil {
+                               return
+                       }
+                       m, err := q.Get(0)
+                       if err == nil {
+                               err = p.Produce(&kafka.Message{
+                                       TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
+                                       Value:          []byte(fmt.Sprintf("%v", m)),
+                               }, nil)
+                               if err == nil {
+                                       q.Remove(0)
+                                       t.counters.sent.step()
+                                       globalCounters.sent.step()
+                                       msg := fmt.Sprintf("%v", m)
+                                       if len(msg) < 500 {
+                                               fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+                                       } else {
+                                               fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+                                       }
+                               } else {
+                                       fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+                                       q.Remove(0)
+                               }
+                       } else {
+                               time.Sleep(10 * time.Millisecond)
+                       }
+               }
+       }()
+
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintf(w, "Sending started")
+}
+
+func startToReceive(w http.ResponseWriter, req *http.Request) {
+       t, topicId, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+
+       if !t.messages.startReceive() {
+               w.WriteHeader(http.StatusOK)
+               fmt.Fprintf(w, "Already started receiving")
+               return
+       }
+
+       go func() {
+
+               defer func() { t.messages.stopReceive() }()
+
+               groudId := "kafkaprocon"
+
+               c, err := kafka.NewConsumer(&kafka.ConfigMap{
+                       "bootstrap.servers":       bootstrapserver,
+                       "group.id":                groudId,
+                       "auto.offset.reset":       "earliest",
+                       "enable.auto.commit":      true,
+                       "auto.commit.interval.ms": 5000,
+               })
+               if err != nil {
+                       fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
+                       t.messages.stopReceive()
+                       return
+               }
+               c.Commit()
+               defer func() { c.Close() }()
+               for {
+                       que := t.messages.received
+                       if que == nil {
+                               fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
+                               return
+                       }
+                       fmt.Printf("Start subscribing on topic: %v\n", topicId)
+                       err = c.SubscribeTopics([]string{topicId}, nil)
+                       if err != nil {
+                               fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
+                               return
+                       }
+                       maxDur := 1 * time.Second
+                       for {
+                               msg, err := c.ReadMessage(maxDur)
+                               if err == nil {
+                                       if len(msg.Value) < 500 {
+                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+                                       } else {
+                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+                                       }
+                                       err = t.messages.received.Enqueue(string(msg.Value))
+                                       if err != nil {
+                                               w.WriteHeader(http.StatusInternalServerError)
+                                               fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
+                                               return
+                                       }
+                                       t.counters.received.step()
+                                       globalCounters.received.step()
+                               } else {
+                                       fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
+                               }
+                       }
+               }
+       }()
+
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintf(w, "Receiving started")
+}
+
+// Post a message to a topic
+// POST /send    content type is specified in content type
+func sendToTopic(w http.ResponseWriter, req *http.Request) {
+       t, topicId, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+       q := t.messages.send
+       if q == nil {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
+               return
+       }
+       ct := req.Header.Get("Content-Type")
+       if ct != t.contentType {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
+               return
+       }
+
+       if ct == "application/json" {
+               // decoder := json.NewDecoder(req.Body)
+               // var j :=
+               // err := decoder.Decode(&j)
+               // if err != nil {
+               //      w.WriteHeader(http.StatusBadRequest)
+               //      w.Header().Set("Content-Type", "text/plain")
+               //      fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
+               //      return
+               // }
+               //m = mux.Vars(req)[""]
+               if err := req.ParseForm(); err != nil {
+                       w.WriteHeader(http.StatusBadRequest)
+                       fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
+                       return
+               }
+               b, err := ioutil.ReadAll(req.Body)
+               if err == nil {
+                       if len(b) < 500 {
+                               fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+                       } else {
+                               fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+                       }
+               }
+               err = q.Enqueue(string(b))
+               if err != nil {
+                       w.WriteHeader(http.StatusInternalServerError)
+                       fmt.Fprintf(w, "Json message to send cannot be put in queue")
+                       return
+               }
+       } else if ct == "text/plain" {
+               if err := req.ParseForm(); err != nil {
+                       w.WriteHeader(http.StatusBadRequest)
+                       fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
+                       return
+               }
+               b, err := ioutil.ReadAll(req.Body)
+               if err == nil {
+                       if len(b) < 500 {
+                               fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+                       } else {
+                               fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+                       }
+               }
+               err = q.Enqueue(string(b))
+               if err != nil {
+                       w.WriteHeader(http.StatusInternalServerError)
+                       fmt.Fprintf(w, "Text message to send cannot be put in queue")
+                       return
+               }
+       } else {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
+               return
+       }
+
+       w.WriteHeader(http.StatusOK)
+       w.Header().Set("Content-Type", "text/plain")
+       fmt.Fprintf(w, "Message to send put in queue")
+}
+
+// Get zero or one message from a topic
+// GET /receive
+func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
+       t, topicId, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+       if t.messages.received == nil {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
+               return
+       }
+
+       m, err := t.messages.received.Dequeue()
+       if err != nil {
+               w.WriteHeader(http.StatusNoContent)
+               return
+       }
+
+       w.Header().Set("Content-Type", t.contentType)
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintf(w, "%v", m)
+}
+
+// Remove the send queue to stop sending
+func stopToSend(w http.ResponseWriter, req *http.Request) {
+       fmt.Printf("Stop sending\n")
+       t, _, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+       t.messages.stopSend()
+       w.WriteHeader(http.StatusNoContent)
+}
+
+// Remove the receive queue to stop receiving
+func stopToReceive(w http.ResponseWriter, req *http.Request) {
+       fmt.Printf("Stop receiving\n")
+       t, _, exist := getTopicFromRequest(w, req)
+       if !exist {
+               return
+       }
+       t.messages.stopReceive()
+       w.WriteHeader(http.StatusNoContent)
+}
+
+func HelloServer(w http.ResponseWriter, r *http.Request) {
+       fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
+}
+
+func main() {
+
+       initApp()
+
+       r := mux.NewRouter()
+
+       r.HandleFunc("/", healthCheck).Methods("GET")
+       r.HandleFunc("/reset", allreset).Methods("POST")
+       r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
+       r.HandleFunc("/topics", getTopics).Methods("GET")
+       r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
+       r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
+       r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
+       r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
+       r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
+       r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
+       r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
+       r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
+       r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
+
+       port := "8090"
+       srv := &http.Server{
+               Handler:      r,
+               Addr:         ":" + port,
+               WriteTimeout: 15 * time.Second,
+               ReadTimeout:  15 * time.Second,
+       }
+       fmt.Println("Running on port: " + port)
+       fmt.Printf(srv.ListenAndServe().Error())
+}
diff --git a/test/kafka-procon/start_local.sh b/test/kafka-procon/start_local.sh
new file mode 100755 (executable)
index 0000000..bfc1a1b
--- /dev/null
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+echo "This script requires golang to be installed and a running kafka instance on (or availble to) localhost"
+
+# Script to build and start app locally
+if [ $# -ne 1 ]; then
+    echo "usage: ./start-local.sh <kafka-boostrapserver-port>"
+    echo "example: ./start-local.sh 30098"
+    exit 1
+fi
+
+export KAFKA_BOOTSTRAP_SERVER=localhost:$1
+
+echo "Starting kafka-procon on local machine"
+go run main.go
index 9547dea..ceb74c7 100644 (file)
@@ -27,9 +27,9 @@ spec:
         - name: https
           containerPort: $ICS_INTERNAL_SECURE_PORT
         volumeMounts:
-        - mountPath: $ICS_CONFIG_MOUNT_PATH
+        - mountPath: $ICS_CONFIG_MOUNT_PATH/$ICS_CONFIG_FILE
+          subPath: $ICS_CONFIG_FILE
           name: ics-conf-name
-        volumeMounts:
         - mountPath: $ICS_CONTAINER_MNT_DIR
           name: ics-data-name
       volumes:
index ee5a48d..3f15aff 100644 (file)
@@ -1,5 +1,5 @@
 ################################################################################
-#   Copyright (c) 2020 Nordix Foundation.                                      #
+#   Copyright (c) 2021 Nordix Foundation.                                      #
 #                                                                              #
 #   Licensed under the Apache License, Version 2.0 (the "License");            #
 #   you may not use this file except in compliance with the License.           #
@@ -13,7 +13,6 @@
 #   See the License for the specific language governing permissions and        #
 #   limitations under the License.                                             #
 ################################################################################
-
 spring:
   profiles:
     active: prod
@@ -21,24 +20,30 @@ spring:
     allow-bean-definition-overriding: true
   aop:
     auto: false
+springdoc:
+  show-actuator: true
 management:
   endpoints:
     web:
       exposure:
+        # Enabling of springboot actuator features. See springboot documentation.
         include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
 
 logging:
+  # Configuration of logging
   level:
     ROOT: ERROR
     org.springframework: ERROR
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
-    org.oransc.information: INFO
+    org.oransc.ics: INFO
   file:
-    name: /var/log/information-coordinator-service/application.log
+    name: $ICS_LOGPATH
 server:
-   port : 8434
-   http-port: 8083
+   # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
+   # See springboot documentation.
+   port : $ICS_INTERNAL_SECURE_PORT
+   http-port: $ICS_INTERNAL_PORT
    ssl:
       key-store-type: JKS
       key-store-password: policy_agent
@@ -46,11 +51,15 @@ server:
       key-password: policy_agent
       key-alias: policy_agent
 app:
-  filepath: /opt/app/information-coordinator-service/data/application_configuration.json
   webclient:
+    # Configuration of the trust store used for the HTTP client (outgoing requests)
+    # The file location and the password for the truststore is only relevant if trust-store-used == true
+    # Note that the same keystore as for the server is used.
     trust-store-used: false
     trust-store-password: policy_agent
     trust-store: /opt/app/information-coordinator-service/etc/cert/truststore.jks
+    # Configuration of usage of HTTP Proxy for the southbound accesses.
+    # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
     http.proxy-host: $ICS_HTTP_PROXY_CONFIG_HOST_NAME
     http.proxy-port: $ICS_HTTP_PROXY_CONFIG_PORT
-  vardata-directory: /var/information-coordinator-service
\ No newline at end of file
+  vardata-directory: $ICS_CONTAINER_MNT_DIR
diff --git a/test/simulator-group/kafka-procon/.gitignore b/test/simulator-group/kafka-procon/.gitignore
new file mode 100644 (file)
index 0000000..7dc00c5
--- /dev/null
@@ -0,0 +1,3 @@
+.tmp.json
+.dockererr
+gen_docker-compose*
\ No newline at end of file
diff --git a/test/simulator-group/kafka-procon/app.yaml b/test/simulator-group/kafka-procon/app.yaml
new file mode 100644 (file)
index 0000000..fd2bc45
--- /dev/null
@@ -0,0 +1,33 @@
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+  name: $KAFKAPC_APP_NAME
+  namespace: $KUBE_SIM_NAMESPACE
+  labels:
+    run: $KAFKAPC_APP_NAME
+    autotest: CR
+spec:
+  replicas: 1
+  serviceName: $KAFKAPC_APP_NAME
+  selector:
+    matchLabels:
+      run: $KAFKAPC_APP_NAME
+  template:
+    metadata:
+      labels:
+        run: $KAFKAPC_APP_NAME
+        autotest: CR
+    spec:
+      containers:
+      - name: $KAFKAPC_APP_NAME
+        image: $KAFKAPC_IMAGE
+        imagePullPolicy: $KUBE_IMAGE_PULL_POLICY
+        ports:
+        - name: http
+          containerPort: $KAFKAPC_INTERNAL_PORT
+        - name: https
+          containerPort: $KAFKAPC_INTERNAL_SECURE_PORT
+        env:
+        - name: KAFKA_BOOTSTRAP_SERVER
+          value: $MR_KAFKA_SERVICE_PATH
+
diff --git a/test/simulator-group/kafka-procon/docker-compose.yml b/test/simulator-group/kafka-procon/docker-compose.yml
new file mode 100644 (file)
index 0000000..bcb35d9
--- /dev/null
@@ -0,0 +1,36 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+version: '3.0'
+networks:
+  default:
+    external: true
+    name: ${DOCKER_SIM_NWNAME}
+services:
+  kafka-procon:
+    networks:
+      - default
+    image: ${KAFKAPC_IMAGE}
+    container_name: ${KAFKAPC_APP_NAME}
+    ports:
+      - ${KAFKAPC_EXTERNAL_PORT}:${KAFKAPC_INTERNAL_PORT}
+      - ${KAFKAPC_EXTERNAL_SECURE_PORT}:${KAFKAPC_INTERNAL_SECURE_PORT}
+    environment:
+      KAFKA_BOOTSTRAP_SERVER: $MR_KAFKA_SERVICE_PATH
+    labels:
+      - "nrttest_app=CR"
+      - "nrttest_dp=${KAFKAPC_DISPLAY_NAME}"
diff --git a/test/simulator-group/kafka-procon/svc.yaml b/test/simulator-group/kafka-procon/svc.yaml
new file mode 100644 (file)
index 0000000..312e239
--- /dev/null
@@ -0,0 +1,21 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: $KAFKAPC_APP_NAME
+  namespace: $KUBE_SIM_NAMESPACE
+  labels:
+    run: $KAFKAPC_APP_NAME
+    autotest: KAFKAPC
+spec:
+  type: ClusterIP
+  ports:
+  - port: $KAFKAPC_EXTERNAL_PORT
+    targetPort: $KAFKAPC_INTERNAL_PORT
+    protocol: TCP
+    name: http
+  - port: $KAFKAPC_EXTERNAL_SECURE_PORT
+    targetPort: $KAFKAPC_INTERNAL_SECURE_PORT
+    protocol: TCP
+    name: https
+  selector:
+    run: $KAFKAPC_APP_NAME
\ No newline at end of file
index 269d9ca..3f1fcfe 100644 (file)
@@ -32,10 +32,8 @@ spec:
         volumeMounts:
         - mountPath: $POLICY_AGENT_CONFIG_MOUNT_PATH
           name: pa-conf-name
-#        volumeMounts:
         - mountPath: $POLICY_AGENT_CONTAINER_MNT_DIR
           name: pa-pv-data-name
-#        volumeMounts:
         - mountPath: $POLICY_AGENT_DATA_MOUNT_PATH
           name: pa-data-name
       volumes: