From 1ef05f94c2b4cc34f4b4b7e891b41548ad9bf446 Mon Sep 17 00:00:00 2001 From: "halil.cakal" Date: Tue, 21 Jun 2022 12:05:14 +0100 Subject: [PATCH] Kafka dispatcher module backbone Patch set 2: Creates a Python-adapter dispatching HTTP/HTTPS requests to a Kafka broker using kafka-python library. Patch set 3: Develops a consumer capable of deserializing JSON messages. Clean up redundant functions and parameters. Patch set 4: Develops a feature that publishes and consumes (to/from) the Kafka cluster in two-way synch. Patch set 5: Develops unit tests cases. Patch set 6: Develops a feature capable of handling requestid sent in headers for error testing. Patch set 7: Develops asynch error test cases. Patch set 8: Develops configurable look-up talbe that maps policy-type to kafka-topic. Supports multi topic including unit tests cases with error based. Patch set 9: Develops publish-consume kafka message for get-policy-status operation. Enriches unit test cases. Patch set 10: Validate open API yaml and fixes review. Patch set 11: Develops an actual kafka response processing including unit test-cases with error based. Patch set 12: Develops configurable response topic to be consumed from, apply changes to policytypetotopicmapping API. Adds unit test case. Patch set 13: Plugs in kafka dispatcher module to A1 Sim to realize e2e sequence. Patch set 14: Develops error testing script in which timeout value is being parametric. Patch set 15: Calculates poll cycle treshold and develops more validation on time out param. Issue-ID: NONRTRIC-757 Change-Id: I9bd257691d8a50e81492c55a5ec17a4409d9da6c Signed-off-by: halil.cakal --- near-rt-ric-simulator/src/STD_2.0.0/a1.py | 53 +++- .../test/KAFKA_DISPATCHER/Dockerfile | 49 +++ .../KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml | 241 +++++++++++++++ .../test/KAFKA_DISPATCHER/certificate/cert.crt | 24 ++ .../certificate/generate_cert_and_key.sh | 26 ++ .../test/KAFKA_DISPATCHER/certificate/key.crt | 30 ++ .../test/KAFKA_DISPATCHER/certificate/pass | 1 + .../test/KAFKA_DISPATCHER/nginx.conf | 93 ++++++ .../resources/policytype_to_topicmap.json | 14 + .../test/KAFKA_DISPATCHER/src/dispatcher.py | 335 +++++++++++++++++++++ .../test/KAFKA_DISPATCHER/src/main.py | 77 +++++ .../test/KAFKA_DISPATCHER/src/maincommon.py | 119 ++++++++ .../test/KAFKA_DISPATCHER/src/payload_logging.py | 60 ++++ .../test/KAFKA_DISPATCHER/src/start.sh | 31 ++ .../test/KAFKA_DISPATCHER/src/var_declaration.py | 26 ++ .../test/KAFKA_DISPATCHER_TEST/basic_test.sh | 126 ++++++++ .../basic_test_with_cust_header.sh | 139 +++++++++ .../test/KAFKA_DISPATCHER_TEST/build_and_start.sh | 45 +++ .../jsonfiles/ANR_to_topic_map.json | 4 + .../jsonfiles/alpha_policy.json | 11 + .../jsonfiles/beta_policy.json | 11 + .../jsonfiles/forced_response.json | 5 + .../jsonfiles/timeout_response.json | 5 + .../test/STD_2.0.0/build_and_start_with_kafka.sh | 96 ++++++ .../common/publish_response_event_to_kafka_bus.py | 87 ++++++ near-rt-ric-simulator/test/common/test_common.sh | 35 ++- 26 files changed, 1740 insertions(+), 3 deletions(-) mode change 100644 => 100755 near-rt-ric-simulator/src/STD_2.0.0/a1.py create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt create mode 100755 near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py create mode 100755 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh create mode 100755 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh create mode 100755 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json create mode 100644 near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json create mode 100755 near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh create mode 100644 near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py diff --git a/near-rt-ric-simulator/src/STD_2.0.0/a1.py b/near-rt-ric-simulator/src/STD_2.0.0/a1.py old mode 100644 new mode 100755 index 28eccc6..e70a8ed --- a/near-rt-ric-simulator/src/STD_2.0.0/a1.py +++ b/near-rt-ric-simulator/src/STD_2.0.0/a1.py @@ -36,7 +36,7 @@ APPL_JSON='application/json' APPL_PROB_JSON='application/problem+json' EXT_SRV_URL=os.getenv('EXT_SRV_URL') - +KAFKA_DISPATCHER_URL=os.getenv('KAFKA_DISPATCHER_URL') # API Function: Get all policy type ids def get_all_policy_types(): @@ -132,6 +132,13 @@ def put_policy(policyTypeId, policyId): pjson=create_problem_json(None, "Duplicate, the policy json already exists.", 400, None, policy_id) return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) + #Callout hooks for kafka dispatcher + if (KAFKA_DISPATCHER_URL is not None): + resp = callout_kafka_dispatcher(policy_type_id, policy_id, data, retcode) + if (resp != 200): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + #Callout hooks for external server #When it fails, break and return 419 HTTP status code if (EXT_SRV_URL is not None): @@ -203,6 +210,13 @@ def delete_policy(policyTypeId, policyId): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) + #Callout hooks for kafka dispatcher + if (KAFKA_DISPATCHER_URL is not None): + resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 204) + if (resp != 200): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + #Callout hooks for external server #When it fails, break and return 419 HTTP status code if (EXT_SRV_URL is not None): @@ -241,8 +255,45 @@ def get_policy_status(policyTypeId, policyId): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) + #Callout hooks for kafka dispatcher + if (KAFKA_DISPATCHER_URL is not None): + resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 202) + if (resp != 200): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + return Response(json.dumps(policy_status[policy_id]), status=200, mimetype=APPL_JSON) + +# Helper: Callout kafka dispatcher server to notify it for policy operations +def callout_kafka_dispatcher(policy_type_id, policy_id, payload, retcode): + + target_url = KAFKA_DISPATCHER_URL + "/policytypes/" + policy_type_id + "/kafkadispatcher/" + policy_id + try: + # create operation, publish with payload + if (retcode == 201): + resp=requests.put(target_url, json=payload, timeout=30, verify=False) + return resp.status_code + # update operation, publish with payload + elif (retcode == 200): + # add headers an update-flag + headers = {'updateoper' : 'yes'} + resp=requests.put(target_url, json=payload, headers=headers, timeout=30, verify=False) + return resp.status_code + # delete operation, publish without payload + elif (retcode == 204): + resp=requests.delete(target_url, timeout=30, verify=False) + return resp.status_code + # get policy status operation, publish without payload + elif (retcode == 202): + # update endpoint + target_url = target_url + "/status" + resp=requests.get(target_url, timeout=30, verify=False) + return resp.status_code + except Exception: + return 419 + + # Helper: Callout external server to notify it for policy operations # Returns 200, 201 and 204 for the success callout hooks, for the others returns 419 def callout_external_server(policy_id, payload, operation): diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile new file mode 100644 index 0000000..bc5d815 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile @@ -0,0 +1,49 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +FROM python:3.8-slim-buster + +RUN pip install connexion[swagger-ui] +RUN pip install kafka-python + +#install nginx and curl +RUN apt-get update && apt-get install -y nginx=1.14.* nginx-extras curl + +WORKDIR /usr/src/app + +COPY api api +COPY nginx.conf nginx.conf +COPY certificate /usr/src/app/cert +COPY src src +COPY resources resources + +ARG user=nonrtric +ARG group=nonrtric + +RUN groupadd $user && \ + useradd -r -g $group $user +RUN chown -R $user:$group /usr/src/app +RUN chown -R $user:$group /var/log/nginx +RUN chown -R $user:$group /var/lib/nginx +RUN chown -R $user:$group /etc/nginx/conf.d +RUN touch /var/run/nginx.pid +RUN chown -R $user:$group /var/run/nginx.pid + +USER ${user} + +RUN chmod +x src/start.sh +CMD src/start.sh diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml new file mode 100644 index 0000000..5480fbe --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml @@ -0,0 +1,241 @@ +openapi: 3.0.0 +info: + title: 'Kafka message dispatcher for A1 interface' + version: 2.0.0 + description: | + Kafka message dispatcher server. + © 2022, O-RAN Alliance. + All rights reserved. +externalDocs: + description: 'RestFUL APIs that create and dispatch Kafka messages to Kafka brokers' + url: 'https://docs.o-ran-sc.org/projects/o-ran-sc-sim-a1-interface/en/latest/index.html' +servers: + - url: '{apiRoot}' + variables: + apiRoot: + default: 'https://example.com' +paths: + '/policytypetotopicmapping/{policyTypeId}': + parameters: + - name: policyTypeId + in: path + required: true + schema: + "$ref": "#/components/schemas/PolicyTypeId" + get: + operationId: dispatcher.get_policy_type_to_topic_mapping + description: 'Get the kafka request and response topic map corresponding to policy type' + tags: + - The mapping from policy type to kafka topic request and response object + responses: + 200: + description: 'The policy type to topic map schemas' + content: + application/json: + schema: + "$ref": "#/components/schemas/PolicyTypeToTopicMap" + 404: + "$ref": "#/components/responses/404-NotFound" + 429: + "$ref": "#/components/responses/429-TooManyRequests" + 503: + "$ref": "#/components/responses/503-ServiceUnavailable" + + '/policytypes/{policyTypeId}/kafkadispatcher/{policyId}': + parameters: + - name: policyTypeId + in: path + required: true + schema: + "$ref": "#/components/schemas/PolicyTypeId" + - name: policyId + in: path + required: true + schema: + "$ref": "#/components/schemas/A1PolicyId" + put: + operationId: dispatcher.put_policy + description: 'Dispatch create and update operation as kafka message to kafka cluster' + tags: + - Individual policy Object + requestBody: + required: true + content: + application/json: + schema: + "$ref": "#/components/schemas/A1PolicyObject" + responses: + 200: + description: 'Create or update operation dispatched' + 400: + "$ref": "#/components/responses/400-BadRequest" + 408: + "$ref": "#/components/responses/408-RequestTimeout" + 419: + "$ref": "#/components/responses/419-KafkaMessagePublishFailed" + 429: + "$ref": "#/components/responses/429-TooManyRequests" + 503: + "$ref": "#/components/responses/503-ServiceUnavailable" + 507: + "$ref": "#/components/responses/507-InsufficientStorage" + delete: + operationId: dispatcher.delete_policy + description: 'Dispatch policy delete opertion as kafka message to kafka cluster' + responses: + 200: + description: 'Delete operation dispatched' + 408: + "$ref": "#/components/responses/408-RequestTimeout" + 419: + "$ref": "#/components/responses/419-KafkaMessagePublishFailed" + 429: + "$ref": "#/components/responses/429-TooManyRequests" + 503: + "$ref": "#/components/responses/503-ServiceUnavailable" + + '/policytypes/{policyTypeId}/kafkadispatcher/{policyId}/status': + parameters: + - name: policyTypeId + in: path + required: true + schema: + "$ref": "#/components/schemas/PolicyTypeId" + - name: policyId + in: path + required: true + schema: + "$ref": "#/components/schemas/A1PolicyId" + get: + operationId: dispatcher.get_policy_status + description: 'Dispatch policy status query opertion as kafka message to kafka cluster' + tags: + - Individual A1 Policy Status Object + responses: + 200: + description: 'Query policy status operation dispatched' + 429: + "$ref": "#/components/responses/429-TooManyRequests" + 503: + "$ref": "#/components/responses/503-ServiceUnavailable" + +components: + schemas: + # + # Representation objects + # + A1PolicyObject: + description: 'A generic policy object' + type: object + + A1Policy: + description: 'A generic policy string' + type: string + + PolicyTypeToTopicMap: + description: 'Request and response topic map for each policy type' + type: object + properties: + policy_type: + type: object + properties: + request_topic: + type: string + example: kafkatopicreq + response_topic: + type: string + example: kafkatopicres + + ProblemDetails: + description: 'A problem detail to carry details in a HTTP response according to RFC 7807' + type: object + properties: + type: + type: string + title: + type: string + status: + type: number + detail: + type: string + instance: + type: string + + # + # Simple data types + # + JsonSchema: + description: 'A JSON schema following http://json-schema.org/draft-07/schema' + type: object + + A1PolicyId: + description: 'A1 policy identifier.' + type: string + + PolicyTypeId: + description: 'Policy type identifier assigned by the A1-P Provider' + type: string + + responses: + 400-BadRequest: + description: 'A1 policy not properly formulated or not related to the method' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 404-NotFound: + description: 'No resource found at the URI' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 405-MethodNotAllowed: + description: 'Method not allowed for the URI' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 408-RequestTimeout: + description: 'Request could not be processed in given amount of time' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 409-Conflict: + description: 'Request could not be processed in the current state of the resource' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 419-KafkaMessagePublishFailed: + description: 'Publishing the kafka message to the broker gets fail' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 429-TooManyRequests: + description: 'Too many requests have been sent in a given amount of time' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 503-ServiceUnavailable: + description: 'The provider is currently unable to handle the request due to a temporary overload' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" + + 507-InsufficientStorage: + description: 'The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request' + content: + application/problem+json: + schema: + "$ref": "#/components/schemas/ProblemDetails" diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt new file mode 100644 index 0000000..6408f33 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIID+zCCAuOgAwIBAgIUWy7JHRvA2GfOU5op4Xcc/8wQF18wDQYJKoZIhvcNAQEL +BQAwgYwxCzAJBgNVBAYTAklFMRIwEAYDVQQIDAlXRVNUTUVBVEgxEDAOBgNVBAcM +B0FUSExPTkUxETAPBgNVBAoMCEVyaWNzc29uMQwwCgYDVQQLDANFU1QxETAPBgNV +BAMMCGVzdC50ZWNoMSMwIQYJKoZIhvcNAQkBFhRoYWxpbC5jYWthbEBlc3QudGVj +aDAeFw0yMjA2MTcwOTEwMDNaFw00OTExMDEwOTEwMDNaMIGMMQswCQYDVQQGEwJJ +RTESMBAGA1UECAwJV0VTVE1FQVRIMRAwDgYDVQQHDAdBVEhMT05FMREwDwYDVQQK +DAhFcmljc3NvbjEMMAoGA1UECwwDRVNUMREwDwYDVQQDDAhlc3QudGVjaDEjMCEG +CSqGSIb3DQEJARYUaGFsaWwuY2FrYWxAZXN0LnRlY2gwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQCqwDVZ7txWX/FaiRiSVa2jnBcV7KN6eqwcKtP3cNP+ +3VTm4YtcY6yp/dPXTYqkAX1qmp5i8USFPnbCstAijI5Uy8kl63dYbirHMPwt9AOL +TXrFRrJ/sev3ULJWKB1IOGt2rFhoUXA23Hv1hagyvjx2upbnVmhrz5qBOT1wuzwN +U2PjFaCFHBs0XphFS/UDEQlvpbNz/jxwHVrEdO8Jr951OFlUBczDDGk0jJ3hRc0p +iM5LNGH02yDvE6pCqqY5Fo5aaj9Vi0Kztv1D/NClWcr3Yh3IuMyZkfS+S8nPd7Nu +VHKWo7cPv9QpeziWiqx0fZcSAh6tUF52hrwGrMONuEojAgMBAAGjUzBRMB0GA1Ud +DgQWBBRiX8NchgUa825PcmhA0b+BOnkx5TAfBgNVHSMEGDAWgBRiX8NchgUa825P +cmhA0b+BOnkx5TAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAi +p6UMfLeTROQJNS4ZAkpQKEtrhGaE8DpnZW+tCLWaxkTk3Mo3ybE3rBW2zugmXFIU +K2PRKivxHmVxY/YQL3sxzQRnzRPl0wuEmm+r0LpGR6VXfTaPrT6YzKM0TU/xKqJI +INk6JiBx8v9dp5LWGoJEs0e4wvoV8Bd05u+73cbhIUisO7KmCp/u1D1BHYtBNDCp +sVH6y9mAlvMIAOzy4wOWqoAxcW2ES8JbesbLOxt/IaQO9DQFPUIjTZURG+62lNCS ++2+lb1ihNqzEECuyw1GQRt88HrSWuj99bCBRRiBij900anadoKIDHdWSEkzhBa0K +zJ5KoQK/o4szZ3VPByfh +-----END CERTIFICATE----- diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh new file mode 100755 index 0000000..7e6d29c --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# This will generate a self-signed certificate with password 'test' + +SUBJECT="/C=IE/ST=WESTMEATH/L=ATHLONE/O=Ericsson/OU=EST/CN=est.tech/emailAddress=halil.cakal@est.tech" +PW=test +echo $PW > pass + +openssl req -x509 -passout file:pass -newkey rsa:2048 -keyout key.crt -subj "$SUBJECT" -out cert.crt -days 9999 diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt new file mode 100644 index 0000000..9f81115 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQI/PWiKXnGNAMCAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECPdSVeuyJFIwBIIEyGmpW5lUwpKF +tlcwBj2WvKF/7GJFw9LEuIBnkm0Ya8LV5Fu5XS9OGg877OkLVr1P152YjDQ+h1nD +ZZSOEqjAGGSlEshie2DSr9bdJdBlr8MTog7SlHQsrU0QLxPRBBwqseI5cyu1Vb41 +4V5+l0iWUoDvXH3mUeH460A43GU4ZUTimkWpP9M9LHyrofjMg7sLzwvOns207DXK +0hffc5293t85CQPau1WenHsfXn65tkPFblYILpJyU4sa1kLa4f5Ynt7RH2mNztfI +26kViIuOBDXcmLAxL3WGZaR9u71qpl0wC1umXWxWNt69DRVyOf23mHHSuuDSEyM3 ++x0rrbj/QaLNnoEjAFvijEAkYdp/jPKKP3kp6LpSVvVVOGLP7srrIrOe4q6bPFfK +d5u1Vnh3/PVEf8xPbJe8UJ5cfx3mWhT+saZOIpEpQvom5GwSYt869P1xyAaa59cx +TcT9KC/Ytg7fSHwXwTclIvucD+cJvbEZNFAwxMkL94a60LNfZ/odq1Bu304Shm/V +DSNeDi1HjfoC3aca7bjsXE8Xj82JQLaSGt7+AuY3gICA5cnJxxWv5VoyRZVPsiRj +Z6ykP2ikjkaLQaqDJmpbnx2ZK/lfrkJI1yI0kYK0xApUQdx9ks8c/AeEcUby2z+H +qPJZuuh1NlEv8jSFn1CO3a5Bpq53EtQlxonKzJYHdoKm1oIEbIUE60K+1oxyXt9S ++l8PgH0T2QlM2lvipy5XegPrTuMYqDywEt4cf1Yk/8RSYEeLzcfKzSuNuy556YRd +Of9nJOKPkVr1cp5si9Vyt+t4cD826WWV1ZgEcIK0i8uhEQxHb+y88DDWAV1DJGog +M0qPGm95lWj4ESiv7A/+AbXY3rJRMp/JB1jmGTNfa3jQ9P29cTwhlpvPnhqviLRH +1YfIOJfghrF55e83bLmJcfwVktMwO8Wzovw7U/1Rzy1j/fyUNkhOEbziIu97ScD+ +7AQDfRqBZZXWREOylAmfWMqZxwVn+CytAFyTLP5CpjSKgBV+qJ5xc3GqNJxaKGK5 +ULXfJKtTheJV3YYnvCuDySFuCv/dkUVjaA3/TqTikKm4THxS+DjtFJey+KWTx57p +X+8ky/E3zAuZzP7r6Dszhp+CAfvXp4CwkLitqfbwja2lcxQ+hbpHYjLN6zvHmmOZ +o0ZeoNpsWj00jAc9NrJt08DfcNomlUz9CZgpvE64gXX7wPCw5zG/c5dbFuoImEeI +h+fKwGh2KdJwazvZ/B20/TRUn1WaQzyRiCJMIgc3aO3AiuffeurliO93iTeVCjJD +d+Bdt4Ub0zPRikTqY4PUDiyfM+vRHOkJ39dY39XaZEsLLXB6XeACWcWT6iBF56Pe +UAi+C4IwPLBHMsIVVSvH8/Rg0IFBHVmkGIi0q7gIyVTcR7FQeCzDZVAjJn4aBgXo +S57v2AbMdD3pwie4ou91NkiM/SnSimLA0BxEh1UEhZk2BEW2Yy6OuvcLn3EfeZVu +M9UNmwIFaq9jWM/qcRQa7MbXTZUTUIPsOvMOsZwPIurqtXWZZTWZ8D0eu2Hu6Vui +FGyY1xIVcIpGXesADsYwy+CSW/EjpV1s+/LGDsUcqZpMeWmj84Zh1Tjt+Fet+967 +dSViTwISZ+O8F2uq0MaiPg== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass new file mode 100644 index 0000000..9daeafb --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass @@ -0,0 +1 @@ +test diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf new file mode 100644 index 0000000..8de906d --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf @@ -0,0 +1,93 @@ +# user www-data; +worker_processes auto; +pid /run/nginx.pid; +include /etc/nginx/modules-enabled/*.conf; + +env ALLOW_HTTP; + +events { + worker_connections 768; + # multi_accept on; +} + +http { + + ## + # Basic Settings + ## + + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 65; + types_hash_max_size 2048; + # server_tokens off; + + # server_names_hash_bucket_size 64; + # server_name_in_redirect off; + + include /etc/nginx/mime.types; + default_type application/octet-stream; + + perl_set $allow_http 'sub { return $ENV{"ALLOW_HTTP"}; }'; + + server { # simple reverse-proxy + listen 7075; + listen [::]:7075; + server_name localhost; + if ($allow_http != true) { + return 444; + } + + # serve dynamic requests + location / { + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_pass http://localhost:7777; + } + } + + server { # simple reverse-proxy + listen 7175 ssl; + listen [::]:7175 ssl; + server_name localhost; + ssl_certificate /usr/src/app/cert/cert.crt; + ssl_certificate_key /usr/src/app/cert/key.crt; + ssl_password_file /usr/src/app/cert/pass; + + # serve dynamic requests + location / { + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_pass http://localhost:7777; + } + } + ## + # SSL Settings + ## + + ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # Dropping SSLv3, ref: POODLE + ssl_prefer_server_ciphers on; + + ## + # Logging Settings + ## + + access_log /var/log/nginx/access.log; + error_log /var/log/nginx/error.log; + + ## + # Gzip Settings + ## + + gzip on; + + # gzip_vary on; + # gzip_proxied any; + # gzip_comp_level 6; + # gzip_buffers 16 8k; + # gzip_http_version 1.1; + # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; +} diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json new file mode 100644 index 0000000..254cb4d --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json @@ -0,0 +1,14 @@ +{ + "ANR": { + "request_topic": "kafkatopicreq", + "response_topic": "kafkatopicres" + }, + "STD_1": { + "request_topic": "kafkatopicreq2", + "response_topic": "kafkatopicres" + }, + "STD_2": { + "request_topic": "kafkatopicreq3", + "response_topic": "kafkatopicres" + } +} diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py new file mode 100644 index 0000000..19e2ab6 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py @@ -0,0 +1,335 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +import os +import json +import time +import math + +from flask import request, Response +from datetime import datetime +from kafka.consumer.fetcher import ConsumerRecord +from kafka import TopicPartition +from var_declaration import forced_settings +from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string + + +MSG_BROKER_URL=os.getenv('MSG_BROKER_URL') + +TIME_OUT=os.getenv('TIME_OUT') + +#Constsants +APPL_JSON='application/json' +TEXT_PLAIN='text/plain' +APPL_PROB_JSON='application/problem+json' + +# API Function: Dispatch create or update events to Kafka cluster +def put_policy(policyTypeId, policyId): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id = str(policyTypeId) + policy_id = str(policyId) + + try: + # Error based unit test rel only, for more info please check basic_test_with_cust_header + req_id_from_header = request.headers.get('requestid') + # Differentiate if the PUT is update or create operation since the endpoint is the same + update_oper_from_header = request.headers.get('updateoper') + data = request.data + data = json.loads(data) + except Exception: + pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id) + return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) + + # Decide if the operation is update or create + if (update_oper_from_header is not None): + kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE') + else: + kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE') + + # Synch callout hooks towards kafka broker + if (MSG_BROKER_URL is not None): + return publish_and_consume(kafka_event, req_id_from_header, policy_type_id) + + return Response('', 200, mimetype=TEXT_PLAIN) + + +# API Function: Dispatch delete events to south Kafka cluster +def delete_policy(policyTypeId, policyId): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id = str(policyTypeId) + policy_id = str(policyId) + + req_id_from_header = request.headers.get('requestid') + print('req_id_from_header', req_id_from_header) + + # Synch callout hooks towards kafka broker + kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE') + if (MSG_BROKER_URL is not None): + return publish_and_consume(kafka_event, req_id_from_header, policy_type_id) + + return Response('', 200, mimetype=TEXT_PLAIN) + + +# API Function: Get status for a policy +def get_policy_status(policyTypeId, policyId): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policyTypeId) + policy_id=str(policyId) + + req_id_from_header = request.headers.get('requestid') + print('req_id_from_header', req_id_from_header) + + # Synch callout hooks towards kafka broker + kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET') + if (MSG_BROKER_URL is not None): + return publish_and_consume(kafka_event, req_id_from_header, policy_type_id) + + return Response('', 200, mimetype=TEXT_PLAIN) + + +def get_policy_type_to_topic_mapping(policyTypeId): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id = str(policyTypeId) + + m_file = open('../resources/policytype_to_topicmap.json') + map_in_dict = json.load(m_file) + + if policy_type_id in map_in_dict.keys(): + topic_address = map_in_dict[policy_type_id] + return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON) + else: + pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id) + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) + + +# Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch +def publish_and_consume(kafka_event, req_id_from_header, pol_type_id): + + # Instantiate KafkaProducer with keyword arguments + producer = create_kafka_producer() + + # Assigns an id to each request that is supposed to get a result + # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only + if (req_id_from_header is None): + req_id = get_random_string(16) + else: + req_id = req_id_from_header + + try: + + resp = get_policy_type_to_topic_mapping(pol_type_id) + # if the policy type to topic mapping could not be found, then returns 404 + # else gets target topic to publish the message to + if (resp.status_code == 404): + return resp + else: + data = json.loads(resp.data) + target_topic_req = data['request_topic'] + target_topic_res = data['response_topic'] + + # synch-publish + # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None) + fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id) + record_metadata = fut_rec_metadata.get() + print('Future:', record_metadata) + publish_time_in_ms = record_metadata.timestamp + + # For test purposes only, publish the success response event with no error-info to response topic + # if basic_test_with_cust_header.sh is being used, then comment this line + # else comment out this line for the basic_test.sh + kafka_response_event = create_kafka_response_event(200, "") + producer.send(target_topic_res, kafka_response_event, req_id) + + # synch-consume + consumer_record = consume_record_for(req_id, target_topic_res) + if (isinstance(consumer_record, ConsumerRecord)): + + print("Consumer Record:", consumer_record) + cons_rec_value = consumer_record.value + cons_rec_val_in_dict = json.loads(cons_rec_value) + resp_code = cons_rec_val_in_dict['response-code'] + + # if response code success, then check for time-out + if (int(resp_code) == 200): + # time-out control block, default time-out duration is thirty seconds + consume_time_in_ms = consumer_record.timestamp + elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms + print('Elapsed time in ms:', elapsed_time_in_ms) + if (elapsed_time_in_ms < int(TIME_OUT) * 1000): + return Response('', 200, mimetype=APPL_JSON) + else: + # returns time-out response code + pjson=create_error_response(408) + return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON) + else: + # for all other responses returns special error of this module by wrapping actual resp code + pjson=create_error_response(419) + return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON) + + elif (isinstance(consumer_record, Response)): + # Returns time-out response + return consumer_record + else: + # returns special error of this module + pjson=create_error_response(419) + return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON) + + except Exception as err: + print('Error while publish and consume', err) + pjson=create_error_response(419) + return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON) + finally: + producer.close() + + +# Helper: Searches for req_id by seeking every five seconds up to thirty seconds +# Helper: If the req_id is found, then ConsumerRecord will be returned +# Helper: If the req_id is not found, then Response Request Timeout will be returned +def consume_record_for(req_id, target_topic_res): + + try: + print ('req_id looking for in consumer:', req_id) + consumer = create_kafka_consumer() + topic_partition = TopicPartition(target_topic_res, 0) + consumer.assign([topic_partition]) + + # calculates poll cycle threshold + sleep_period_in_sec = 5 + poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec + poll_cycle_threshold = math.floor(poll_cycle_threshold) + print('poll_cycle_threshold', poll_cycle_threshold) + + poll_retries = 0 + starting_offset = 0 + prev_last_offset = 0 + while (poll_retries < poll_cycle_threshold): + # Manually specify the fetch offset for a TopicPartition + consumer.seek(topic_partition, starting_offset) + # Get the last offset for the given partitions + last_offset = consumer.end_offsets([topic_partition])[topic_partition] + print('last_offset',last_offset) + + if (last_offset != prev_last_offset): + for consumer_record in consumer: + # Get req_id as msg_key and converts it from byte to str for each consumer record + msg_key = byte_to_str(consumer_record.key) + print ('msg_key in a consumer_record:', msg_key) + if (req_id == msg_key): + print ('req_id is found in consumer records', req_id) + return consumer_record + elif (consumer_record.offset == last_offset - 1): + break + + print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...') + time.sleep(sleep_period_in_sec) + + poll_retries += 1 + prev_last_offset = last_offset + starting_offset += last_offset + + # Returns time-out response + pjson=create_error_response(408) + return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON) + + except Exception as err: + print('Error while consume record for req_id', err) + pjson=create_error_response(419) + return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON) + finally: + consumer.close() + + +# Helper: Create a response object if forced http response code is set +def get_forced_response(): + + if (forced_settings['code'] is not None): + resp_code=forced_settings['code'] + pjson=create_error_response(int(resp_code)) + return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON) + return None + + +# Helper: Delay if delayed response code is set +def do_delay(): + + if (forced_settings['delay'] is not None): + try: + val=int(forced_settings['delay']) + time.sleep(val) + except Exception: + return + + +# Helper: Check if response shall be delayed or a forced response shall be sent +def check_modified_response(): + + do_delay() + return get_forced_response() + + +# Helper: Create a problem json object +def create_problem_json(type_of, title, status, detail, instance): + + error = {} + if type_of is not None: + error["type"] = type_of + if title is not None: + error["title"] = title + if status is not None: + error["status"] = status + if detail is not None: + error["detail"] = detail + if instance is not None: + error["instance"] = instance + return error + + +# Helper: Create a problem json based on a generic http response code +def create_error_response(code): + + if code == 400: + return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None)) + elif code == 404: + return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None)) + elif code == 405: + return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None)) + elif code == 408: + return(create_problem_json(None, "Request timeout", 408, "Request timeout", None)) + elif code == 409: + return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None)) + elif (code == 419): + return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None)) + elif code == 429: + return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None)) + elif code == 507: + return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None)) + elif code == 503: + return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None)) + else: + return(create_problem_json(None, "Unknown", code, "Not implemented response code", None)) diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py new file mode 100644 index 0000000..7816e7f --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py @@ -0,0 +1,77 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +import json +import sys +import requests + + +from flask import request, Response, Flask, json +from var_declaration import forced_settings, app +from maincommon import check_timeout, check_apipath + +#Constants +TEXT_PLAIN='text/plain' + +check_apipath() +check_timeout() + +# app is created in var_declarations + +import payload_logging # app var need to be initialized + +#Check alive function +@app.route('/', methods=['GET']) +def test(): + return Response("OK", 200, mimetype=TEXT_PLAIN) + +#Set|Reset force response to be returned from dispatcher +#/dispatcheradmin/forceresponse?code= +@app.route('/dispatcheradmin/forceresponse', methods=['POST']) +def forceresponse(): + + query_param=request.args.get('code') + forced_settings['code']=query_param + + if (query_param is None): + return Response("Force response code has been resetted for dispatcher responses", 200, mimetype=TEXT_PLAIN) + else: + return Response("Force response code: " + str(forced_settings['code']) + " set for all dispatcher response until it is resetted", 200, mimetype=TEXT_PLAIN) + +#Set|Reset force delay response, in seconds, for all external server responses +#/a1policy/forcedelay?delay= +@app.route('/dispatcheradmin/forcedelay', methods=['POST']) +def forcedelay(): + + query_param=request.args.get('delay') + forced_settings['delay']=query_param + + if (query_param is None): + return Response("Force delay has been resetted for all dispatcher responses ", 200, mimetype=TEXT_PLAIN) + else: + return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all dispatcher responses until it is resetted ", 200, mimetype=TEXT_PLAIN) + +port_number = 7777 +if len(sys.argv) >= 2: + if isinstance(sys.argv[1], int): + port_number = sys.argv[1] + +#Import base RestFUL API functions from Open API +app.add_api('KAFKA_DISPATCHER_api.yaml') + +if __name__ == '__main__': + app.run(port=port_number, host="127.0.0.1", threaded=False) diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py new file mode 100644 index 0000000..c534acb --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py @@ -0,0 +1,119 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +import os +import sys +import json +from pathlib import Path +from flask import Response +import socket +import ssl +import random +import string + +from kafka import KafkaProducer, KafkaConsumer + +#Must exist +apipath=os.environ['APIPATH'] +timeout=os.getenv('TIME_OUT') + +MSG_BROKER_URL=os.getenv('MSG_BROKER_URL') + + +# Make sure the api path is set, otherwise exit +def check_apipath(): + if (apipath is None): + print("Env APIPATH not set. Exiting....") + sys.exit(1) + +# Make sure the timeout is set and greater than zero, otherwise exit +def check_timeout(): + if (timeout is None): + print("Env TIME_OUT not set. Exiting....") + sys.exit(1) + elif (int(timeout) < 0): + print("Env TIME_OUT must be greater than zero. Exiting....") + sys.exit(1) + +# Instantiate KafkaProducer with keyword arguments +# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html +def create_kafka_producer(): + + producer = KafkaProducer( + bootstrap_servers = [MSG_BROKER_URL], + key_serializer = str.encode, + value_serializer = lambda m: json.dumps(m).encode('ascii'), + ) + return producer + + +# Instantiate KafkaConsumer with keyword arguments +# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html +def create_kafka_consumer(): + consumer = KafkaConsumer( + #KAFKA_TOPIC_RES, + bootstrap_servers = MSG_BROKER_URL, + auto_offset_reset = 'earliest', + value_deserializer = lambda m: json.loads(m.decode('ascii')), + #enable_auto_commit=False + ) + return consumer + + +# Helper: Builds a Kafka event +def create_kafka_event(policy_type_id, policy_id, payload, operation): + + kafka_event_format = {'action': operation_to_action(operation), 'payload': payload, 'policy_type_id': policy_type_id, 'policy_id': policy_id} + # converts dict to str + kafka_event_json = json.dumps(kafka_event_format) + return kafka_event_json + +# Helper: Builds a Kafka event +def create_kafka_response_event(response_code, error_info): + + kafka_response_event_format = {'response-code': response_code, 'error-info': error_info} + # converts dict to str + kafka_response_event_json = json.dumps(kafka_response_event_format) + return kafka_response_event_json + +# Helper: Converts a HTTP operation to an explanation +def operation_to_action(argument): + + switcher = { + 'CREATE': "CreatePolicy", + 'UPDATE': "UpdatePolicy", + 'DELETE': "DeletePolicy", + 'GET': "GetPolicyStatus", + } + return switcher.get(argument, None) + + +# Helper: Converts a byte array to a str +def byte_to_str(byte_arr): + + if (byte_arr is not None): + return byte_arr.decode('utf-8') + else: + return None + + +# Helper: Creates random string +def get_random_string(length): + + characters = string.ascii_letters + string.digits + string.punctuation + password = ''.join(random.choice(characters) for i in range(length)) + return password diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py new file mode 100644 index 0000000..9457d04 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py @@ -0,0 +1,60 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +from var_declaration import app +from flask import Flask, request, Response + +#Constants +TEXT_PLAIN='text/plain' + +#Vars +payload_log=True + +#Function to activate/deactivate http header and payload logging +@app.route('/payload_logging/', methods=['POST', 'PUT']) +def set_payload_logging(state): + global payload_log + if (state == "on"): + payload_log=True + elif (state == "off"): + payload_log=False + else: + return Response("Unknown state: "+state+" - use 'on' or 'off'", 400, mimetype=TEXT_PLAIN) + + return Response("Payload and header logging set to: "+state, 200, mimetype=TEXT_PLAIN) + +# Generic function to log http header and payload - called before the request +@app.app.before_request +def log_request_info(): + if (payload_log is True): + print('') + print('-----Request-----') + print('Req Headers: ', request.headers) + print('Req Body: ', request.get_data()) + +# Generic function to log http header and payload - called after the response +@app.app.after_request +def log_response_info(response): + if (payload_log is True): + print('-----Response-----') + print('Resp Headers: ', response.headers) + print('Resp Body: ', response.get_data()) + return response + +# Helper function to check loggin state +def is_payload_logging(): + return payload_log diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh new file mode 100644 index 0000000..e4e3510 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +#Set path to open api +export APIPATH=$PWD/api +echo "APIPATH set to: "$APIPATH + +cd src + +#start nginx +nginx -c /usr/src/app/nginx.conf + +#start Kafka message dispatcher +echo "Path to main.py: "$PWD +python -u main.py diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py new file mode 100644 index 0000000..e6063b4 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py @@ -0,0 +1,26 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +from maincommon import apipath +import connexion + +#Main app +app = connexion.App(__name__, specification_dir=apipath) + +forced_settings = {} +forced_settings['code']=None +forced_settings['delay']=None diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh new file mode 100755 index 0000000..ef7014d --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh @@ -0,0 +1,126 @@ +#!/bin/bash + +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# Script for basic test of the Kafka message dispatcher. +# Run the build_and_start with the same arg, except arg 'nonsecure|secure', as this script + +print_usage() { + echo "Usage: ./basic_test.sh nonsecure|secure " + exit 1 +} + +if [ $# -ne 1 ]; then + print_usage +fi +if [ "$1" != "nonsecure" ] && [ "$1" != "secure" ]; then + print_usage +fi + +if [ $1 == "nonsecure" ]; then + #Default http port for the simulator + PORT=7075 + # Set http protocol + HTTPX="http" +else + #Default https port for the simulator + PORT=7175 + # Set https protocol + HTTPX="https" +fi + +. ../common/test_common.sh +. ../common/elapse_time_curl.sh + +echo "=== Kafka message dispatcher hello world ===" +RESULT="OK" +do_curl GET / 200 + +echo "=== Reset force delay ===" +RESULT="Force delay has been resetted for all dispatcher responses" +do_curl POST /dispatcheradmin/forcedelay 200 + +echo "=== API: Get policy type to topic mapping of type: ANR ===" +res=$(cat jsonfiles/ANR_to_topic_map.json) +RESULT="json:$res" +do_curl GET /policytypetotopicmapping/ANR 200 + +echo "=== Put policy: shall publish and consume for put policy operation ===" +RESULT="" +do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json + +echo "=== Get policy status: shall publish and consume for get policy status operation ===" +RESULT="" +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json + +echo "=== Put policy: shall publish and consume for put policy operation for alpha ===" +RESULT="" +do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json + +echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ===" +RESULT="" +do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 + +echo "=== Set force delay 5 sec ===" +RESULT="Force delay: 5 sec set for all dispatcher responses until it is resetted" +do_curl POST '/dispatcheradmin/forcedelay?delay=5' 200 + +echo "=== Put policy: shall wait at least sec and then respond while publishing and consuming ===" +RESULT="" +do_elapsetime_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json 5 + +echo "=== Reset force delay ===" +RESULT="Force delay has been resetted for all dispatcher responses" +do_curl POST /dispatcheradmin/forcedelay 200 + +echo "=== Put policy: shall publish and consume for put policy operation for beta ===" +RESULT="" +do_curl PUT /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json + +echo "=== Get policy status: shall publish and consume for get policy status operation ===" +RESULT="" +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json + +echo "=== Put policy: shall publish and consume for put policy operation for alpha ===" +RESULT="" +do_curl PUT /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json + +echo "=== Set force response code: 500 ===" +RESULT="Force response code: 500 set for all dispatcher response until it is resetted" +do_curl POST '/dispatcheradmin/forceresponse?code=500' 200 + +echo "=== Put policy: shall not publish and consume for put policy operation for alpha ===" +res=$(cat jsonfiles/forced_response.json) +RESULT="json:$res" +do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json + +echo "=== Reset force response code ===" +RESULT="Force response code has been resetted for dispatcher responses" +do_curl POST /dispatcheradmin/forceresponse 200 + +echo "=== Get policy status: shall publish and consume for get policy status operation ===" +RESULT="" +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json + +echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ===" +RESULT="" +do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 + +echo "********************" +echo "*** All tests ok ***" +echo "********************" diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh new file mode 100755 index 0000000..4fa8ac0 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh @@ -0,0 +1,139 @@ +#!/bin/bash + +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# Script for error testing of the Kafka message dispatcher +# The timeout value should be equal to TIME_OUT param that exist in the start script +# Run the script with the args: nonsecure|secure timeout=30 + +print_usage() { + echo "Usage: ./basic_test.sh nonsecure|secure timeout=30" + exit 1 +} + +if [ $# -ne 2 ]; then + print_usage +fi +if [ "$1" != "nonsecure" ] && [ "$1" != "secure" ]; then + print_usage +fi + +timeout=$(echo "$2" | cut -d'=' -f2) +regexp_for_number='^[0-9]+$' + +if ! [[ $timeout =~ $regexp_for_number ]] ; then + echo "error:"$timeout" Not a number" + exit 1 +else + if [ $timeout -le 0 ]; then + echo "Timeout value must be greater than zero" + exit 1 + fi +fi + +if [ $1 == "nonsecure" ]; then + # Default http port for the simulator + PORT=7075 + # Set http protocol + HTTPX="http" +else + #Default https port for the simulator + PORT=7175 + # Set https protocol + HTTPX="https" +fi + +. ../common/test_common.sh + +echo "=== Kafka message dispatcher hello world ===" +RESULT="OK" +do_curl GET / 200 + +echo "=== Reset force delay ===" +RESULT="Force delay has been resetted for all dispatcher responses" +do_curl POST /dispatcheradmin/forcedelay 200 + +# asynch error test case +echo "=== Put policy: shall publish and consume time-out ===" +req_id=$(get_random_number) +res=$(cat jsonfiles/timeout_response.json) +RESULT="json:$res" +# asynch callout +do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 408 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +sleep $timeout +# after time out duration, publish the event +publish_response_event $req_id +# wait until the main process to be completed +wait $proc_id + +# asynch success test case after 10s +echo "=== Put policy: shall publish and consume success at least 10 secs later ===" +req_id=$(get_random_number) +RESULT="" +# asynch callout +do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +sleep 10 +# after 10s, publish the event +publish_response_event $req_id +# wait until the main process to be completed +wait $proc_id + +# asynch error test case +echo "=== Get policy status: shall publish and consume time-out ===" +req_id=$(get_random_number) +res=$(cat jsonfiles/timeout_response.json) +RESULT="json:$res" +# asynch callout +do_curl GET /policytypes/STD_2/kafkadispatcher/alpha/status 408 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +sleep $timeout +# after time out duration, publish the event +publish_response_event $req_id +# wait until the main process to be completed +wait $proc_id + +# asynch success test case after 10s +echo "=== Get policy status: shall publish and consume success at least 15 secs later ===" +req_id=$(get_random_number) +RESULT="" +# asynch callout +do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +sleep 15 +# after 15s, publish the event +publish_response_event $req_id +# wait until the main process to be completed +wait $proc_id + +# asynch success test case without any delay +echo "=== Delete policy: shall publish and consume success ===" +req_id=$(get_random_number) +RESULT="" +# asynch callout +do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id & +proc_id=$! +publish_response_event $req_id +# wait until the main process to be completed +wait $proc_id + + +echo "********************" +echo "*** All tests ok ***" +echo "********************" diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh new file mode 100755 index 0000000..dff351e --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# Script to build and start the container +# Make sure to run the simulator with the same arg as this script + +print_usage() { + echo "Usage: ./build_and_start.sh " + exit 1 +} + +if [ $# -ge 1 ]; then + print_usage +fi + +echo "Building Kafka message dispatcher image..." +cd ../KAFKA_DISPATCHER/ + +#Build the image +docker build -t kafka_dispatcher . + +docker stop kafkamessagedispatcher > /dev/null 2>&1 +docker rm -f kafkamessagedispatcher > /dev/null 2>&1 + +echo "Starting Kafka message dispatcher..." +echo "PWD path: "$PWD + +#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds +docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json new file mode 100644 index 0000000..ed52462 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json @@ -0,0 +1,4 @@ +{ + "request_topic": "kafkatopicreq", + "response_topic": "kafkatopicres" +} diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json new file mode 100644 index 0000000..66c2b63 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json @@ -0,0 +1,11 @@ +{ + + "title": "A1 policy external server", + "description": "A1 policies notifying external server", + "type": "object", + "properties": { + "a1policyType": "alpha_test_policy", + "url" : "http://www.com" + } + +} diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json new file mode 100644 index 0000000..a61c7fc --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json @@ -0,0 +1,11 @@ +{ + + "title": "A1 policy external server", + "description": "A1 policies notifying external server", + "type": "object", + "properties": { + "a1policyType": "beta_test_policy", + "url" : "http://www.com" + } + +} diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json new file mode 100644 index 0000000..4d26325 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json @@ -0,0 +1,5 @@ +{ + "title": "Unknown", + "status": 500, + "detail": "Not implemented response code" +} diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json new file mode 100644 index 0000000..dd034c4 --- /dev/null +++ b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json @@ -0,0 +1,5 @@ +{ + "title": "Request timeout", + "status": 408, + "detail": "Request timeout" +} diff --git a/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh b/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh new file mode 100755 index 0000000..c017513 --- /dev/null +++ b/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh @@ -0,0 +1,96 @@ +#!/bin/bash + +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# Script to build and start the container +# Make sure to run the simulator with the same arg as this script + +print_usage() { + echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure|ignore-kafka-srv" + exit 1 +} + +if [ $# -ne 2 ]; then + print_usage +fi + +if [ $1 == "duplicate-check" ]; then + DUP_CHECK=1 +elif [ $1 == "ignore-duplicate" ]; then + DUP_CHECK=0 +else + print_usage +fi + +if [ $2 == "kafka-srv" ]; then + URL="http://localhost:7075" +elif [ $2 == "kafka-srv-secure" ]; then + URL="https://localhost:7175" +elif [ $2 == "ignore-kafka-srv" ]; then + URL="" +else + print_usage +fi + +URL_FLAG="" +if [ ! -z "$URL" ]; then + URL_FLAG="-e KAFKA_DISPATCHER_URL=$URL" +fi + +# Stop and remove container images if they run + +echo "Stopping A1 simulator image..." +docker stop a1StdSimulator > /dev/null 2>&1 +docker rm -f a1StdSimulator > /dev/null 2>&1 + +echo "Stopping kafka dispatcher server image..." +docker stop kafkamessagedispatcher > /dev/null 2>&1 +docker rm -f kafkamessagedispatcher > /dev/null 2>&1 + +# Initialize path variables for certificate and build operations + +dirstd2=$PWD + +cd ../../ +dirnrtsim=$PWD + +cd test/KAFKA_DISPATCHER/ +dirkafkasrv=$PWD + +# Build containers + +cd $dirnrtsim +echo "Building A1 simulator image..." +docker build -t a1test . + +if [ ! -z "$URL" ]; then + cd $dirkafkasrv + echo "Building kafka server image..." + docker build -t kafka_dispatcher . +fi + +# Run containers + +# Runs kafka server in detached mode +# In order to tail logs use:: docker logs -f kafkamessagedispatcher +if [ ! -z "$URL" ]; then + docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e KAFKA_TOPIC_RES=kafkatopicres -e TIME_OUT=30000 --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher +fi + +# Runs A1 simulator +docker run --network host --rm -it -p 8085:8085 -p 8185:8185 -e A1_VERSION=STD_2.0.0 -e ALLOW_HTTP=true -e REMOTE_HOSTS_LOGGING=1 -e DUPLICATE_CHECK=$DUP_CHECK $URL_FLAG --volume "$dirnrtsim/certificate:/usr/src/app/cert" --name a1StdSimulator a1test diff --git a/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py b/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py new file mode 100644 index 0000000..635dc6d --- /dev/null +++ b/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py @@ -0,0 +1,87 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +# This script publishes a response-event to a kafka bus +# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed +# To instal kafka-python please use: pip install kafka-python +# Example of an response-event json +#{ + #"response-code": "400", + #"error-info": "Bad format" +#} + + +import os +import json +import sys + +from kafka import KafkaProducer + +# Response string with JSON format +response_data_JSON = """ +{ + "response-code": 200, + "error-info": "" +} +""" + +# Instantiate KafkaProducer with keyword arguments +# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html +def create_kafka_producer(): + + producer = KafkaProducer( + bootstrap_servers = ['localhost:9092'], + key_serializer = str.encode, + value_serializer = lambda m: json.dumps(m).encode('ascii'), + ) + return producer + +# Helper: Publishes (to) the target broker and the topic in synch +def publish(kafka_evet, req_id): + + # Instantiate KafkaProducer with keyword arguments + producer = create_kafka_producer() + # Assigns an id to each request that is supposed to get a result + # req_id = 'Hll1EsycKLNRric7' + + try: + + # synch-publish + # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None) + fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id) + return fut_rec_metadata.get() + + except Exception as err: + print('Error while publish', err) + finally: + producer.close() + +if __name__ == '__main__': + try: + + requestid = sys.argv[1] + # response_data_JSON is str + future = publish(response_data_JSON, requestid) + + if (future is not None): + print (0) + else: + print (1) + + except Exception: + print (1) + sys.exit() diff --git a/near-rt-ric-simulator/test/common/test_common.sh b/near-rt-ric-simulator/test/common/test_common.sh index 6d2f80e..af4e79b 100755 --- a/near-rt-ric-simulator/test/common/test_common.sh +++ b/near-rt-ric-simulator/test/common/test_common.sh @@ -1,7 +1,7 @@ #!/bin/bash # ============LICENSE_START=============================================== -# Copyright (C) 2020 Nordix Foundation. All rights reserved. +# Copyright (C) 2020-2022 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ #Expects the env $RESULT to contain the expected RESULT. #If json, the RESULT shall begin with 'json:'. #Any json parameter with unknown value shall be given as "????" to skip checking the value. +#The requestid parameter is being introduced in the fifth order. do_curl() { if [ $# -lt 3 ]; then echo "Need 3 or more parameters, [file]: "$@ @@ -33,9 +34,12 @@ do_curl() { exit 1 fi curlstr="curl -X "$1" -skw %{http_code} $HTTPX://localhost:"${PORT}${2}" -H accept:*/*" - if [ $# -gt 3 ]; then + if [ $# -eq 4 ]; then curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4 fi + if [ $# -ge 5 ]; then + curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4" -H requestid:"$5 + fi echo " CMD (${BASH_LINENO[0]}):"$curlstr res=$($curlstr) status=${res:${#res}-3} @@ -75,6 +79,33 @@ do_curl() { fi } +# Triggers publish_event_to_kafka_bus.py script to send msg to Kafka broker +# The aim of this function is to realize error related test cases only +# The request_id for the Kafka msg, should be passed here as a function parameter +publish_response_event() { + if [ $# -lt 1 ]; then + echo "Need 1 parameter, " + echo "Exiting test script....." + exit 1 + fi + res=$(python ../common/publish_response_event_to_kafka_bus.py "$1") + if [ $res -eq 0 ]; then + echo " Result as expected " + else + echo " Result not expected " + echo " Exiting..... " + exit 1 + fi +} + +# Creates 16 digits random number using letters and numbers only +get_random_number() { + r_num=$(tr -dc A-Za-z0-9 < /dev/urandom | head -c 16) + echo $r_num +} + +# It is being used to cross-test-cases in between A1 sim and external server +# The parameter it holds all with regards to External Server relates e.g. HTTPX_EXT_SRV and PORT_EXT_SRV do_curl_ext_srv() { if [ $# -lt 3 ]; then echo "Need 3 or more parameters, [file]: "$@ -- 2.16.6