Patch set 2: Creates a Python-adapter dispatching HTTP/HTTPS requests to a Kafka broker using kafka-python library.
Patch set 3: Develops a consumer capable of deserializing JSON messages. Clean up redundant functions and parameters.
Patch set 4: Develops a feature that publishes and consumes (to/from) the Kafka cluster in two-way synch.
Patch set 5: Develops unit tests cases.
Patch set 6: Develops a feature capable of handling requestid sent in headers for error testing.
Patch set 7: Develops asynch error test cases.
Patch set 8: Develops configurable look-up talbe that maps policy-type to kafka-topic. Supports multi topic including unit tests cases with error based.
Patch set 9: Develops publish-consume kafka message for get-policy-status operation. Enriches unit test cases.
Patch set 10: Validate open API yaml and fixes review.
Patch set 11: Develops an actual kafka response processing including unit test-cases with error based.
Patch set 12: Develops configurable response topic to be consumed from, apply changes to policytypetotopicmapping API. Adds unit test case.
Patch set 13: Plugs in kafka dispatcher module to A1 Sim to realize e2e sequence.
Patch set 14: Develops error testing script in which timeout value is being parametric.
Patch set 15: Calculates poll cycle treshold and develops more validation on time out param.
Issue-ID: NONRTRIC-757
Change-Id: I9bd257691d8a50e81492c55a5ec17a4409d9da6c
Signed-off-by: halil.cakal <halil.cakal@est.tech>
APPL_PROB_JSON='application/problem+json'
EXT_SRV_URL=os.getenv('EXT_SRV_URL')
-
+KAFKA_DISPATCHER_URL=os.getenv('KAFKA_DISPATCHER_URL')
# API Function: Get all policy type ids
def get_all_policy_types():
pjson=create_problem_json(None, "Duplicate, the policy json already exists.", 400, None, policy_id)
return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
+ #Callout hooks for kafka dispatcher
+ if (KAFKA_DISPATCHER_URL is not None):
+ resp = callout_kafka_dispatcher(policy_type_id, policy_id, data, retcode)
+ if (resp != 200):
+ pjson=create_error_response(resp)
+ return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
+
#Callout hooks for external server
#When it fails, break and return 419 HTTP status code
if (EXT_SRV_URL is not None):
pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
+ #Callout hooks for kafka dispatcher
+ if (KAFKA_DISPATCHER_URL is not None):
+ resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 204)
+ if (resp != 200):
+ pjson=create_error_response(resp)
+ return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
+
#Callout hooks for external server
#When it fails, break and return 419 HTTP status code
if (EXT_SRV_URL is not None):
pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
+ #Callout hooks for kafka dispatcher
+ if (KAFKA_DISPATCHER_URL is not None):
+ resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 202)
+ if (resp != 200):
+ pjson=create_error_response(resp)
+ return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
+
return Response(json.dumps(policy_status[policy_id]), status=200, mimetype=APPL_JSON)
+
+# Helper: Callout kafka dispatcher server to notify it for policy operations
+def callout_kafka_dispatcher(policy_type_id, policy_id, payload, retcode):
+
+ target_url = KAFKA_DISPATCHER_URL + "/policytypes/" + policy_type_id + "/kafkadispatcher/" + policy_id
+ try:
+ # create operation, publish with payload
+ if (retcode == 201):
+ resp=requests.put(target_url, json=payload, timeout=30, verify=False)
+ return resp.status_code
+ # update operation, publish with payload
+ elif (retcode == 200):
+ # add headers an update-flag
+ headers = {'updateoper' : 'yes'}
+ resp=requests.put(target_url, json=payload, headers=headers, timeout=30, verify=False)
+ return resp.status_code
+ # delete operation, publish without payload
+ elif (retcode == 204):
+ resp=requests.delete(target_url, timeout=30, verify=False)
+ return resp.status_code
+ # get policy status operation, publish without payload
+ elif (retcode == 202):
+ # update endpoint
+ target_url = target_url + "/status"
+ resp=requests.get(target_url, timeout=30, verify=False)
+ return resp.status_code
+ except Exception:
+ return 419
+
+
# Helper: Callout external server to notify it for policy operations
# Returns 200, 201 and 204 for the success callout hooks, for the others returns 419
def callout_external_server(policy_id, payload, operation):
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+FROM python:3.8-slim-buster
+
+RUN pip install connexion[swagger-ui]
+RUN pip install kafka-python
+
+#install nginx and curl
+RUN apt-get update && apt-get install -y nginx=1.14.* nginx-extras curl
+
+WORKDIR /usr/src/app
+
+COPY api api
+COPY nginx.conf nginx.conf
+COPY certificate /usr/src/app/cert
+COPY src src
+COPY resources resources
+
+ARG user=nonrtric
+ARG group=nonrtric
+
+RUN groupadd $user && \
+ useradd -r -g $group $user
+RUN chown -R $user:$group /usr/src/app
+RUN chown -R $user:$group /var/log/nginx
+RUN chown -R $user:$group /var/lib/nginx
+RUN chown -R $user:$group /etc/nginx/conf.d
+RUN touch /var/run/nginx.pid
+RUN chown -R $user:$group /var/run/nginx.pid
+
+USER ${user}
+
+RUN chmod +x src/start.sh
+CMD src/start.sh
--- /dev/null
+openapi: 3.0.0
+info:
+ title: 'Kafka message dispatcher for A1 interface'
+ version: 2.0.0
+ description: |
+ Kafka message dispatcher server.
+ © 2022, O-RAN Alliance.
+ All rights reserved.
+externalDocs:
+ description: 'RestFUL APIs that create and dispatch Kafka messages to Kafka brokers'
+ url: 'https://docs.o-ran-sc.org/projects/o-ran-sc-sim-a1-interface/en/latest/index.html'
+servers:
+ - url: '{apiRoot}'
+ variables:
+ apiRoot:
+ default: 'https://example.com'
+paths:
+ '/policytypetotopicmapping/{policyTypeId}':
+ parameters:
+ - name: policyTypeId
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/PolicyTypeId"
+ get:
+ operationId: dispatcher.get_policy_type_to_topic_mapping
+ description: 'Get the kafka request and response topic map corresponding to policy type'
+ tags:
+ - The mapping from policy type to kafka topic request and response object
+ responses:
+ 200:
+ description: 'The policy type to topic map schemas'
+ content:
+ application/json:
+ schema:
+ "$ref": "#/components/schemas/PolicyTypeToTopicMap"
+ 404:
+ "$ref": "#/components/responses/404-NotFound"
+ 429:
+ "$ref": "#/components/responses/429-TooManyRequests"
+ 503:
+ "$ref": "#/components/responses/503-ServiceUnavailable"
+
+ '/policytypes/{policyTypeId}/kafkadispatcher/{policyId}':
+ parameters:
+ - name: policyTypeId
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/PolicyTypeId"
+ - name: policyId
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/A1PolicyId"
+ put:
+ operationId: dispatcher.put_policy
+ description: 'Dispatch create and update operation as kafka message to kafka cluster'
+ tags:
+ - Individual policy Object
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ "$ref": "#/components/schemas/A1PolicyObject"
+ responses:
+ 200:
+ description: 'Create or update operation dispatched'
+ 400:
+ "$ref": "#/components/responses/400-BadRequest"
+ 408:
+ "$ref": "#/components/responses/408-RequestTimeout"
+ 419:
+ "$ref": "#/components/responses/419-KafkaMessagePublishFailed"
+ 429:
+ "$ref": "#/components/responses/429-TooManyRequests"
+ 503:
+ "$ref": "#/components/responses/503-ServiceUnavailable"
+ 507:
+ "$ref": "#/components/responses/507-InsufficientStorage"
+ delete:
+ operationId: dispatcher.delete_policy
+ description: 'Dispatch policy delete opertion as kafka message to kafka cluster'
+ responses:
+ 200:
+ description: 'Delete operation dispatched'
+ 408:
+ "$ref": "#/components/responses/408-RequestTimeout"
+ 419:
+ "$ref": "#/components/responses/419-KafkaMessagePublishFailed"
+ 429:
+ "$ref": "#/components/responses/429-TooManyRequests"
+ 503:
+ "$ref": "#/components/responses/503-ServiceUnavailable"
+
+ '/policytypes/{policyTypeId}/kafkadispatcher/{policyId}/status':
+ parameters:
+ - name: policyTypeId
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/PolicyTypeId"
+ - name: policyId
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/A1PolicyId"
+ get:
+ operationId: dispatcher.get_policy_status
+ description: 'Dispatch policy status query opertion as kafka message to kafka cluster'
+ tags:
+ - Individual A1 Policy Status Object
+ responses:
+ 200:
+ description: 'Query policy status operation dispatched'
+ 429:
+ "$ref": "#/components/responses/429-TooManyRequests"
+ 503:
+ "$ref": "#/components/responses/503-ServiceUnavailable"
+
+components:
+ schemas:
+ #
+ # Representation objects
+ #
+ A1PolicyObject:
+ description: 'A generic policy object'
+ type: object
+
+ A1Policy:
+ description: 'A generic policy string'
+ type: string
+
+ PolicyTypeToTopicMap:
+ description: 'Request and response topic map for each policy type'
+ type: object
+ properties:
+ policy_type:
+ type: object
+ properties:
+ request_topic:
+ type: string
+ example: kafkatopicreq
+ response_topic:
+ type: string
+ example: kafkatopicres
+
+ ProblemDetails:
+ description: 'A problem detail to carry details in a HTTP response according to RFC 7807'
+ type: object
+ properties:
+ type:
+ type: string
+ title:
+ type: string
+ status:
+ type: number
+ detail:
+ type: string
+ instance:
+ type: string
+
+ #
+ # Simple data types
+ #
+ JsonSchema:
+ description: 'A JSON schema following http://json-schema.org/draft-07/schema'
+ type: object
+
+ A1PolicyId:
+ description: 'A1 policy identifier.'
+ type: string
+
+ PolicyTypeId:
+ description: 'Policy type identifier assigned by the A1-P Provider'
+ type: string
+
+ responses:
+ 400-BadRequest:
+ description: 'A1 policy not properly formulated or not related to the method'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 404-NotFound:
+ description: 'No resource found at the URI'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 405-MethodNotAllowed:
+ description: 'Method not allowed for the URI'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 408-RequestTimeout:
+ description: 'Request could not be processed in given amount of time'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 409-Conflict:
+ description: 'Request could not be processed in the current state of the resource'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 419-KafkaMessagePublishFailed:
+ description: 'Publishing the kafka message to the broker gets fail'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 429-TooManyRequests:
+ description: 'Too many requests have been sent in a given amount of time'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 503-ServiceUnavailable:
+ description: 'The provider is currently unable to handle the request due to a temporary overload'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
+
+ 507-InsufficientStorage:
+ description: 'The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request'
+ content:
+ application/problem+json:
+ schema:
+ "$ref": "#/components/schemas/ProblemDetails"
--- /dev/null
+-----BEGIN CERTIFICATE-----
+MIID+zCCAuOgAwIBAgIUWy7JHRvA2GfOU5op4Xcc/8wQF18wDQYJKoZIhvcNAQEL
+BQAwgYwxCzAJBgNVBAYTAklFMRIwEAYDVQQIDAlXRVNUTUVBVEgxEDAOBgNVBAcM
+B0FUSExPTkUxETAPBgNVBAoMCEVyaWNzc29uMQwwCgYDVQQLDANFU1QxETAPBgNV
+BAMMCGVzdC50ZWNoMSMwIQYJKoZIhvcNAQkBFhRoYWxpbC5jYWthbEBlc3QudGVj
+aDAeFw0yMjA2MTcwOTEwMDNaFw00OTExMDEwOTEwMDNaMIGMMQswCQYDVQQGEwJJ
+RTESMBAGA1UECAwJV0VTVE1FQVRIMRAwDgYDVQQHDAdBVEhMT05FMREwDwYDVQQK
+DAhFcmljc3NvbjEMMAoGA1UECwwDRVNUMREwDwYDVQQDDAhlc3QudGVjaDEjMCEG
+CSqGSIb3DQEJARYUaGFsaWwuY2FrYWxAZXN0LnRlY2gwggEiMA0GCSqGSIb3DQEB
+AQUAA4IBDwAwggEKAoIBAQCqwDVZ7txWX/FaiRiSVa2jnBcV7KN6eqwcKtP3cNP+
+3VTm4YtcY6yp/dPXTYqkAX1qmp5i8USFPnbCstAijI5Uy8kl63dYbirHMPwt9AOL
+TXrFRrJ/sev3ULJWKB1IOGt2rFhoUXA23Hv1hagyvjx2upbnVmhrz5qBOT1wuzwN
+U2PjFaCFHBs0XphFS/UDEQlvpbNz/jxwHVrEdO8Jr951OFlUBczDDGk0jJ3hRc0p
+iM5LNGH02yDvE6pCqqY5Fo5aaj9Vi0Kztv1D/NClWcr3Yh3IuMyZkfS+S8nPd7Nu
+VHKWo7cPv9QpeziWiqx0fZcSAh6tUF52hrwGrMONuEojAgMBAAGjUzBRMB0GA1Ud
+DgQWBBRiX8NchgUa825PcmhA0b+BOnkx5TAfBgNVHSMEGDAWgBRiX8NchgUa825P
+cmhA0b+BOnkx5TAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAi
+p6UMfLeTROQJNS4ZAkpQKEtrhGaE8DpnZW+tCLWaxkTk3Mo3ybE3rBW2zugmXFIU
+K2PRKivxHmVxY/YQL3sxzQRnzRPl0wuEmm+r0LpGR6VXfTaPrT6YzKM0TU/xKqJI
+INk6JiBx8v9dp5LWGoJEs0e4wvoV8Bd05u+73cbhIUisO7KmCp/u1D1BHYtBNDCp
+sVH6y9mAlvMIAOzy4wOWqoAxcW2ES8JbesbLOxt/IaQO9DQFPUIjTZURG+62lNCS
++2+lb1ihNqzEECuyw1GQRt88HrSWuj99bCBRRiBij900anadoKIDHdWSEkzhBa0K
+zJ5KoQK/o4szZ3VPByfh
+-----END CERTIFICATE-----
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# This will generate a self-signed certificate with password 'test'
+
+SUBJECT="/C=IE/ST=WESTMEATH/L=ATHLONE/O=Ericsson/OU=EST/CN=est.tech/emailAddress=halil.cakal@est.tech"
+PW=test
+echo $PW > pass
+
+openssl req -x509 -passout file:pass -newkey rsa:2048 -keyout key.crt -subj "$SUBJECT" -out cert.crt -days 9999
--- /dev/null
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQI/PWiKXnGNAMCAggA
+MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECPdSVeuyJFIwBIIEyGmpW5lUwpKF
+tlcwBj2WvKF/7GJFw9LEuIBnkm0Ya8LV5Fu5XS9OGg877OkLVr1P152YjDQ+h1nD
+ZZSOEqjAGGSlEshie2DSr9bdJdBlr8MTog7SlHQsrU0QLxPRBBwqseI5cyu1Vb41
+4V5+l0iWUoDvXH3mUeH460A43GU4ZUTimkWpP9M9LHyrofjMg7sLzwvOns207DXK
+0hffc5293t85CQPau1WenHsfXn65tkPFblYILpJyU4sa1kLa4f5Ynt7RH2mNztfI
+26kViIuOBDXcmLAxL3WGZaR9u71qpl0wC1umXWxWNt69DRVyOf23mHHSuuDSEyM3
++x0rrbj/QaLNnoEjAFvijEAkYdp/jPKKP3kp6LpSVvVVOGLP7srrIrOe4q6bPFfK
+d5u1Vnh3/PVEf8xPbJe8UJ5cfx3mWhT+saZOIpEpQvom5GwSYt869P1xyAaa59cx
+TcT9KC/Ytg7fSHwXwTclIvucD+cJvbEZNFAwxMkL94a60LNfZ/odq1Bu304Shm/V
+DSNeDi1HjfoC3aca7bjsXE8Xj82JQLaSGt7+AuY3gICA5cnJxxWv5VoyRZVPsiRj
+Z6ykP2ikjkaLQaqDJmpbnx2ZK/lfrkJI1yI0kYK0xApUQdx9ks8c/AeEcUby2z+H
+qPJZuuh1NlEv8jSFn1CO3a5Bpq53EtQlxonKzJYHdoKm1oIEbIUE60K+1oxyXt9S
++l8PgH0T2QlM2lvipy5XegPrTuMYqDywEt4cf1Yk/8RSYEeLzcfKzSuNuy556YRd
+Of9nJOKPkVr1cp5si9Vyt+t4cD826WWV1ZgEcIK0i8uhEQxHb+y88DDWAV1DJGog
+M0qPGm95lWj4ESiv7A/+AbXY3rJRMp/JB1jmGTNfa3jQ9P29cTwhlpvPnhqviLRH
+1YfIOJfghrF55e83bLmJcfwVktMwO8Wzovw7U/1Rzy1j/fyUNkhOEbziIu97ScD+
+7AQDfRqBZZXWREOylAmfWMqZxwVn+CytAFyTLP5CpjSKgBV+qJ5xc3GqNJxaKGK5
+ULXfJKtTheJV3YYnvCuDySFuCv/dkUVjaA3/TqTikKm4THxS+DjtFJey+KWTx57p
+X+8ky/E3zAuZzP7r6Dszhp+CAfvXp4CwkLitqfbwja2lcxQ+hbpHYjLN6zvHmmOZ
+o0ZeoNpsWj00jAc9NrJt08DfcNomlUz9CZgpvE64gXX7wPCw5zG/c5dbFuoImEeI
+h+fKwGh2KdJwazvZ/B20/TRUn1WaQzyRiCJMIgc3aO3AiuffeurliO93iTeVCjJD
+d+Bdt4Ub0zPRikTqY4PUDiyfM+vRHOkJ39dY39XaZEsLLXB6XeACWcWT6iBF56Pe
+UAi+C4IwPLBHMsIVVSvH8/Rg0IFBHVmkGIi0q7gIyVTcR7FQeCzDZVAjJn4aBgXo
+S57v2AbMdD3pwie4ou91NkiM/SnSimLA0BxEh1UEhZk2BEW2Yy6OuvcLn3EfeZVu
+M9UNmwIFaq9jWM/qcRQa7MbXTZUTUIPsOvMOsZwPIurqtXWZZTWZ8D0eu2Hu6Vui
+FGyY1xIVcIpGXesADsYwy+CSW/EjpV1s+/LGDsUcqZpMeWmj84Zh1Tjt+Fet+967
+dSViTwISZ+O8F2uq0MaiPg==
+-----END ENCRYPTED PRIVATE KEY-----
--- /dev/null
+# user www-data;
+worker_processes auto;
+pid /run/nginx.pid;
+include /etc/nginx/modules-enabled/*.conf;
+
+env ALLOW_HTTP;
+
+events {
+ worker_connections 768;
+ # multi_accept on;
+}
+
+http {
+
+ ##
+ # Basic Settings
+ ##
+
+ sendfile on;
+ tcp_nopush on;
+ tcp_nodelay on;
+ keepalive_timeout 65;
+ types_hash_max_size 2048;
+ # server_tokens off;
+
+ # server_names_hash_bucket_size 64;
+ # server_name_in_redirect off;
+
+ include /etc/nginx/mime.types;
+ default_type application/octet-stream;
+
+ perl_set $allow_http 'sub { return $ENV{"ALLOW_HTTP"}; }';
+
+ server { # simple reverse-proxy
+ listen 7075;
+ listen [::]:7075;
+ server_name localhost;
+ if ($allow_http != true) {
+ return 444;
+ }
+
+ # serve dynamic requests
+ location / {
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ proxy_pass http://localhost:7777;
+ }
+ }
+
+ server { # simple reverse-proxy
+ listen 7175 ssl;
+ listen [::]:7175 ssl;
+ server_name localhost;
+ ssl_certificate /usr/src/app/cert/cert.crt;
+ ssl_certificate_key /usr/src/app/cert/key.crt;
+ ssl_password_file /usr/src/app/cert/pass;
+
+ # serve dynamic requests
+ location / {
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ proxy_pass http://localhost:7777;
+ }
+ }
+ ##
+ # SSL Settings
+ ##
+
+ ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # Dropping SSLv3, ref: POODLE
+ ssl_prefer_server_ciphers on;
+
+ ##
+ # Logging Settings
+ ##
+
+ access_log /var/log/nginx/access.log;
+ error_log /var/log/nginx/error.log;
+
+ ##
+ # Gzip Settings
+ ##
+
+ gzip on;
+
+ # gzip_vary on;
+ # gzip_proxied any;
+ # gzip_comp_level 6;
+ # gzip_buffers 16 8k;
+ # gzip_http_version 1.1;
+ # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
+}
--- /dev/null
+{
+ "ANR": {
+ "request_topic": "kafkatopicreq",
+ "response_topic": "kafkatopicres"
+ },
+ "STD_1": {
+ "request_topic": "kafkatopicreq2",
+ "response_topic": "kafkatopicres"
+ },
+ "STD_2": {
+ "request_topic": "kafkatopicreq3",
+ "response_topic": "kafkatopicres"
+ }
+}
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+import os
+import json
+import time
+import math
+
+from flask import request, Response
+from datetime import datetime
+from kafka.consumer.fetcher import ConsumerRecord
+from kafka import TopicPartition
+from var_declaration import forced_settings
+from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string
+
+
+MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
+
+TIME_OUT=os.getenv('TIME_OUT')
+
+#Constsants
+APPL_JSON='application/json'
+TEXT_PLAIN='text/plain'
+APPL_PROB_JSON='application/problem+json'
+
+# API Function: Dispatch create or update events to Kafka cluster
+def put_policy(policyTypeId, policyId):
+
+ if ((r := check_modified_response()) is not None):
+ return r
+
+ policy_type_id = str(policyTypeId)
+ policy_id = str(policyId)
+
+ try:
+ # Error based unit test rel only, for more info please check basic_test_with_cust_header
+ req_id_from_header = request.headers.get('requestid')
+ # Differentiate if the PUT is update or create operation since the endpoint is the same
+ update_oper_from_header = request.headers.get('updateoper')
+ data = request.data
+ data = json.loads(data)
+ except Exception:
+ pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id)
+ return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
+
+ # Decide if the operation is update or create
+ if (update_oper_from_header is not None):
+ kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE')
+ else:
+ kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE')
+
+ # Synch callout hooks towards kafka broker
+ if (MSG_BROKER_URL is not None):
+ return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
+
+ return Response('', 200, mimetype=TEXT_PLAIN)
+
+
+# API Function: Dispatch delete events to south Kafka cluster
+def delete_policy(policyTypeId, policyId):
+
+ if ((r := check_modified_response()) is not None):
+ return r
+
+ policy_type_id = str(policyTypeId)
+ policy_id = str(policyId)
+
+ req_id_from_header = request.headers.get('requestid')
+ print('req_id_from_header', req_id_from_header)
+
+ # Synch callout hooks towards kafka broker
+ kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE')
+ if (MSG_BROKER_URL is not None):
+ return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
+
+ return Response('', 200, mimetype=TEXT_PLAIN)
+
+
+# API Function: Get status for a policy
+def get_policy_status(policyTypeId, policyId):
+
+ if ((r := check_modified_response()) is not None):
+ return r
+
+ policy_type_id=str(policyTypeId)
+ policy_id=str(policyId)
+
+ req_id_from_header = request.headers.get('requestid')
+ print('req_id_from_header', req_id_from_header)
+
+ # Synch callout hooks towards kafka broker
+ kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET')
+ if (MSG_BROKER_URL is not None):
+ return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
+
+ return Response('', 200, mimetype=TEXT_PLAIN)
+
+
+def get_policy_type_to_topic_mapping(policyTypeId):
+
+ if ((r := check_modified_response()) is not None):
+ return r
+
+ policy_type_id = str(policyTypeId)
+
+ m_file = open('../resources/policytype_to_topicmap.json')
+ map_in_dict = json.load(m_file)
+
+ if policy_type_id in map_in_dict.keys():
+ topic_address = map_in_dict[policy_type_id]
+ return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON)
+ else:
+ pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id)
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
+
+
+# Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch
+def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
+
+ # Instantiate KafkaProducer with keyword arguments
+ producer = create_kafka_producer()
+
+ # Assigns an id to each request that is supposed to get a result
+ # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only
+ if (req_id_from_header is None):
+ req_id = get_random_string(16)
+ else:
+ req_id = req_id_from_header
+
+ try:
+
+ resp = get_policy_type_to_topic_mapping(pol_type_id)
+ # if the policy type to topic mapping could not be found, then returns 404
+ # else gets target topic to publish the message to
+ if (resp.status_code == 404):
+ return resp
+ else:
+ data = json.loads(resp.data)
+ target_topic_req = data['request_topic']
+ target_topic_res = data['response_topic']
+
+ # synch-publish
+ # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
+ fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id)
+ record_metadata = fut_rec_metadata.get()
+ print('Future:', record_metadata)
+ publish_time_in_ms = record_metadata.timestamp
+
+ # For test purposes only, publish the success response event with no error-info to response topic
+ # if basic_test_with_cust_header.sh is being used, then comment this line
+ # else comment out this line for the basic_test.sh
+ kafka_response_event = create_kafka_response_event(200, "")
+ producer.send(target_topic_res, kafka_response_event, req_id)
+
+ # synch-consume
+ consumer_record = consume_record_for(req_id, target_topic_res)
+ if (isinstance(consumer_record, ConsumerRecord)):
+
+ print("Consumer Record:", consumer_record)
+ cons_rec_value = consumer_record.value
+ cons_rec_val_in_dict = json.loads(cons_rec_value)
+ resp_code = cons_rec_val_in_dict['response-code']
+
+ # if response code success, then check for time-out
+ if (int(resp_code) == 200):
+ # time-out control block, default time-out duration is thirty seconds
+ consume_time_in_ms = consumer_record.timestamp
+ elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms
+ print('Elapsed time in ms:', elapsed_time_in_ms)
+ if (elapsed_time_in_ms < int(TIME_OUT) * 1000):
+ return Response('', 200, mimetype=APPL_JSON)
+ else:
+ # returns time-out response code
+ pjson=create_error_response(408)
+ return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
+ else:
+ # for all other responses returns special error of this module by wrapping actual resp code
+ pjson=create_error_response(419)
+ return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+
+ elif (isinstance(consumer_record, Response)):
+ # Returns time-out response
+ return consumer_record
+ else:
+ # returns special error of this module
+ pjson=create_error_response(419)
+ return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+
+ except Exception as err:
+ print('Error while publish and consume', err)
+ pjson=create_error_response(419)
+ return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+ finally:
+ producer.close()
+
+
+# Helper: Searches for req_id by seeking every five seconds up to thirty seconds
+# Helper: If the req_id is found, then ConsumerRecord will be returned
+# Helper: If the req_id is not found, then Response Request Timeout will be returned
+def consume_record_for(req_id, target_topic_res):
+
+ try:
+ print ('req_id looking for in consumer:', req_id)
+ consumer = create_kafka_consumer()
+ topic_partition = TopicPartition(target_topic_res, 0)
+ consumer.assign([topic_partition])
+
+ # calculates poll cycle threshold
+ sleep_period_in_sec = 5
+ poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+ poll_cycle_threshold = math.floor(poll_cycle_threshold)
+ print('poll_cycle_threshold', poll_cycle_threshold)
+
+ poll_retries = 0
+ starting_offset = 0
+ prev_last_offset = 0
+ while (poll_retries < poll_cycle_threshold):
+ # Manually specify the fetch offset for a TopicPartition
+ consumer.seek(topic_partition, starting_offset)
+ # Get the last offset for the given partitions
+ last_offset = consumer.end_offsets([topic_partition])[topic_partition]
+ print('last_offset',last_offset)
+
+ if (last_offset != prev_last_offset):
+ for consumer_record in consumer:
+ # Get req_id as msg_key and converts it from byte to str for each consumer record
+ msg_key = byte_to_str(consumer_record.key)
+ print ('msg_key in a consumer_record:', msg_key)
+ if (req_id == msg_key):
+ print ('req_id is found in consumer records', req_id)
+ return consumer_record
+ elif (consumer_record.offset == last_offset - 1):
+ break
+
+ print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
+ time.sleep(sleep_period_in_sec)
+
+ poll_retries += 1
+ prev_last_offset = last_offset
+ starting_offset += last_offset
+
+ # Returns time-out response
+ pjson=create_error_response(408)
+ return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
+
+ except Exception as err:
+ print('Error while consume record for req_id', err)
+ pjson=create_error_response(419)
+ return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+ finally:
+ consumer.close()
+
+
+# Helper: Create a response object if forced http response code is set
+def get_forced_response():
+
+ if (forced_settings['code'] is not None):
+ resp_code=forced_settings['code']
+ pjson=create_error_response(int(resp_code))
+ return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
+ return None
+
+
+# Helper: Delay if delayed response code is set
+def do_delay():
+
+ if (forced_settings['delay'] is not None):
+ try:
+ val=int(forced_settings['delay'])
+ time.sleep(val)
+ except Exception:
+ return
+
+
+# Helper: Check if response shall be delayed or a forced response shall be sent
+def check_modified_response():
+
+ do_delay()
+ return get_forced_response()
+
+
+# Helper: Create a problem json object
+def create_problem_json(type_of, title, status, detail, instance):
+
+ error = {}
+ if type_of is not None:
+ error["type"] = type_of
+ if title is not None:
+ error["title"] = title
+ if status is not None:
+ error["status"] = status
+ if detail is not None:
+ error["detail"] = detail
+ if instance is not None:
+ error["instance"] = instance
+ return error
+
+
+# Helper: Create a problem json based on a generic http response code
+def create_error_response(code):
+
+ if code == 400:
+ return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
+ elif code == 404:
+ return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
+ elif code == 405:
+ return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
+ elif code == 408:
+ return(create_problem_json(None, "Request timeout", 408, "Request timeout", None))
+ elif code == 409:
+ return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
+ elif (code == 419):
+ return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None))
+ elif code == 429:
+ return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
+ elif code == 507:
+ return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
+ elif code == 503:
+ return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
+ else:
+ return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+import json
+import sys
+import requests
+
+
+from flask import request, Response, Flask, json
+from var_declaration import forced_settings, app
+from maincommon import check_timeout, check_apipath
+
+#Constants
+TEXT_PLAIN='text/plain'
+
+check_apipath()
+check_timeout()
+
+# app is created in var_declarations
+
+import payload_logging # app var need to be initialized
+
+#Check alive function
+@app.route('/', methods=['GET'])
+def test():
+ return Response("OK", 200, mimetype=TEXT_PLAIN)
+
+#Set|Reset force response to be returned from dispatcher
+#/dispatcheradmin/forceresponse?code=<responsecode>
+@app.route('/dispatcheradmin/forceresponse', methods=['POST'])
+def forceresponse():
+
+ query_param=request.args.get('code')
+ forced_settings['code']=query_param
+
+ if (query_param is None):
+ return Response("Force response code has been resetted for dispatcher responses", 200, mimetype=TEXT_PLAIN)
+ else:
+ return Response("Force response code: " + str(forced_settings['code']) + " set for all dispatcher response until it is resetted", 200, mimetype=TEXT_PLAIN)
+
+#Set|Reset force delay response, in seconds, for all external server responses
+#/a1policy/forcedelay?delay=<seconds>
+@app.route('/dispatcheradmin/forcedelay', methods=['POST'])
+def forcedelay():
+
+ query_param=request.args.get('delay')
+ forced_settings['delay']=query_param
+
+ if (query_param is None):
+ return Response("Force delay has been resetted for all dispatcher responses ", 200, mimetype=TEXT_PLAIN)
+ else:
+ return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all dispatcher responses until it is resetted ", 200, mimetype=TEXT_PLAIN)
+
+port_number = 7777
+if len(sys.argv) >= 2:
+ if isinstance(sys.argv[1], int):
+ port_number = sys.argv[1]
+
+#Import base RestFUL API functions from Open API
+app.add_api('KAFKA_DISPATCHER_api.yaml')
+
+if __name__ == '__main__':
+ app.run(port=port_number, host="127.0.0.1", threaded=False)
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+import os
+import sys
+import json
+from pathlib import Path
+from flask import Response
+import socket
+import ssl
+import random
+import string
+
+from kafka import KafkaProducer, KafkaConsumer
+
+#Must exist
+apipath=os.environ['APIPATH']
+timeout=os.getenv('TIME_OUT')
+
+MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
+
+
+# Make sure the api path is set, otherwise exit
+def check_apipath():
+ if (apipath is None):
+ print("Env APIPATH not set. Exiting....")
+ sys.exit(1)
+
+# Make sure the timeout is set and greater than zero, otherwise exit
+def check_timeout():
+ if (timeout is None):
+ print("Env TIME_OUT not set. Exiting....")
+ sys.exit(1)
+ elif (int(timeout) < 0):
+ print("Env TIME_OUT must be greater than zero. Exiting....")
+ sys.exit(1)
+
+# Instantiate KafkaProducer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
+def create_kafka_producer():
+
+ producer = KafkaProducer(
+ bootstrap_servers = [MSG_BROKER_URL],
+ key_serializer = str.encode,
+ value_serializer = lambda m: json.dumps(m).encode('ascii'),
+ )
+ return producer
+
+
+# Instantiate KafkaConsumer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
+def create_kafka_consumer():
+ consumer = KafkaConsumer(
+ #KAFKA_TOPIC_RES,
+ bootstrap_servers = MSG_BROKER_URL,
+ auto_offset_reset = 'earliest',
+ value_deserializer = lambda m: json.loads(m.decode('ascii')),
+ #enable_auto_commit=False
+ )
+ return consumer
+
+
+# Helper: Builds a Kafka event
+def create_kafka_event(policy_type_id, policy_id, payload, operation):
+
+ kafka_event_format = {'action': operation_to_action(operation), 'payload': payload, 'policy_type_id': policy_type_id, 'policy_id': policy_id}
+ # converts dict to str
+ kafka_event_json = json.dumps(kafka_event_format)
+ return kafka_event_json
+
+# Helper: Builds a Kafka event
+def create_kafka_response_event(response_code, error_info):
+
+ kafka_response_event_format = {'response-code': response_code, 'error-info': error_info}
+ # converts dict to str
+ kafka_response_event_json = json.dumps(kafka_response_event_format)
+ return kafka_response_event_json
+
+# Helper: Converts a HTTP operation to an explanation
+def operation_to_action(argument):
+
+ switcher = {
+ 'CREATE': "CreatePolicy",
+ 'UPDATE': "UpdatePolicy",
+ 'DELETE': "DeletePolicy",
+ 'GET': "GetPolicyStatus",
+ }
+ return switcher.get(argument, None)
+
+
+# Helper: Converts a byte array to a str
+def byte_to_str(byte_arr):
+
+ if (byte_arr is not None):
+ return byte_arr.decode('utf-8')
+ else:
+ return None
+
+
+# Helper: Creates random string
+def get_random_string(length):
+
+ characters = string.ascii_letters + string.digits + string.punctuation
+ password = ''.join(random.choice(characters) for i in range(length))
+ return password
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+from var_declaration import app
+from flask import Flask, request, Response
+
+#Constants
+TEXT_PLAIN='text/plain'
+
+#Vars
+payload_log=True
+
+#Function to activate/deactivate http header and payload logging
+@app.route('/payload_logging/<state>', methods=['POST', 'PUT'])
+def set_payload_logging(state):
+ global payload_log
+ if (state == "on"):
+ payload_log=True
+ elif (state == "off"):
+ payload_log=False
+ else:
+ return Response("Unknown state: "+state+" - use 'on' or 'off'", 400, mimetype=TEXT_PLAIN)
+
+ return Response("Payload and header logging set to: "+state, 200, mimetype=TEXT_PLAIN)
+
+# Generic function to log http header and payload - called before the request
+@app.app.before_request
+def log_request_info():
+ if (payload_log is True):
+ print('')
+ print('-----Request-----')
+ print('Req Headers: ', request.headers)
+ print('Req Body: ', request.get_data())
+
+# Generic function to log http header and payload - called after the response
+@app.app.after_request
+def log_response_info(response):
+ if (payload_log is True):
+ print('-----Response-----')
+ print('Resp Headers: ', response.headers)
+ print('Resp Body: ', response.get_data())
+ return response
+
+# Helper function to check loggin state
+def is_payload_logging():
+ return payload_log
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+#Set path to open api
+export APIPATH=$PWD/api
+echo "APIPATH set to: "$APIPATH
+
+cd src
+
+#start nginx
+nginx -c /usr/src/app/nginx.conf
+
+#start Kafka message dispatcher
+echo "Path to main.py: "$PWD
+python -u main.py
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+from maincommon import apipath
+import connexion
+
+#Main app
+app = connexion.App(__name__, specification_dir=apipath)
+
+forced_settings = {}
+forced_settings['code']=None
+forced_settings['delay']=None
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Script for basic test of the Kafka message dispatcher.
+# Run the build_and_start with the same arg, except arg 'nonsecure|secure', as this script
+
+print_usage() {
+ echo "Usage: ./basic_test.sh nonsecure|secure "
+ exit 1
+}
+
+if [ $# -ne 1 ]; then
+ print_usage
+fi
+if [ "$1" != "nonsecure" ] && [ "$1" != "secure" ]; then
+ print_usage
+fi
+
+if [ $1 == "nonsecure" ]; then
+ #Default http port for the simulator
+ PORT=7075
+ # Set http protocol
+ HTTPX="http"
+else
+ #Default https port for the simulator
+ PORT=7175
+ # Set https protocol
+ HTTPX="https"
+fi
+
+. ../common/test_common.sh
+. ../common/elapse_time_curl.sh
+
+echo "=== Kafka message dispatcher hello world ==="
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== Reset force delay ==="
+RESULT="Force delay has been resetted for all dispatcher responses"
+do_curl POST /dispatcheradmin/forcedelay 200
+
+echo "=== API: Get policy type to topic mapping of type: ANR ==="
+res=$(cat jsonfiles/ANR_to_topic_map.json)
+RESULT="json:$res"
+do_curl GET /policytypetotopicmapping/ANR 200
+
+echo "=== Put policy: shall publish and consume for put policy operation ==="
+RESULT=""
+do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+
+echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+RESULT=""
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+
+echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+RESULT=""
+do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+
+echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+RESULT=""
+do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200
+
+echo "=== Set force delay 5 sec ==="
+RESULT="Force delay: 5 sec set for all dispatcher responses until it is resetted"
+do_curl POST '/dispatcheradmin/forcedelay?delay=5' 200
+
+echo "=== Put policy: shall wait at least <delay-time> sec and then respond while publishing and consuming ==="
+RESULT=""
+do_elapsetime_curl PUT /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json 5
+
+echo "=== Reset force delay ==="
+RESULT="Force delay has been resetted for all dispatcher responses"
+do_curl POST /dispatcheradmin/forcedelay 200
+
+echo "=== Put policy: shall publish and consume for put policy operation for beta ==="
+RESULT=""
+do_curl PUT /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json
+
+echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+RESULT=""
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json
+
+echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+RESULT=""
+do_curl PUT /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+
+echo "=== Set force response code: 500 ==="
+RESULT="Force response code: 500 set for all dispatcher response until it is resetted"
+do_curl POST '/dispatcheradmin/forceresponse?code=500' 200
+
+echo "=== Put policy: shall not publish and consume for put policy operation for alpha ==="
+res=$(cat jsonfiles/forced_response.json)
+RESULT="json:$res"
+do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json
+
+echo "=== Reset force response code ==="
+RESULT="Force response code has been resetted for dispatcher responses"
+do_curl POST /dispatcheradmin/forceresponse 200
+
+echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+RESULT=""
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+
+echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+RESULT=""
+do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Script for error testing of the Kafka message dispatcher
+# The timeout value should be equal to TIME_OUT param that exist in the start script
+# Run the script with the args: nonsecure|secure timeout=30
+
+print_usage() {
+ echo "Usage: ./basic_test.sh nonsecure|secure timeout=30"
+ exit 1
+}
+
+if [ $# -ne 2 ]; then
+ print_usage
+fi
+if [ "$1" != "nonsecure" ] && [ "$1" != "secure" ]; then
+ print_usage
+fi
+
+timeout=$(echo "$2" | cut -d'=' -f2)
+regexp_for_number='^[0-9]+$'
+
+if ! [[ $timeout =~ $regexp_for_number ]] ; then
+ echo "error:"$timeout" Not a number"
+ exit 1
+else
+ if [ $timeout -le 0 ]; then
+ echo "Timeout value must be greater than zero"
+ exit 1
+ fi
+fi
+
+if [ $1 == "nonsecure" ]; then
+ # Default http port for the simulator
+ PORT=7075
+ # Set http protocol
+ HTTPX="http"
+else
+ #Default https port for the simulator
+ PORT=7175
+ # Set https protocol
+ HTTPX="https"
+fi
+
+. ../common/test_common.sh
+
+echo "=== Kafka message dispatcher hello world ==="
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== Reset force delay ==="
+RESULT="Force delay has been resetted for all dispatcher responses"
+do_curl POST /dispatcheradmin/forcedelay 200
+
+# asynch error test case
+echo "=== Put policy: shall publish and consume time-out ==="
+req_id=$(get_random_number)
+res=$(cat jsonfiles/timeout_response.json)
+RESULT="json:$res"
+# asynch callout
+do_curl PUT /policytypes/ANR/kafkadispatcher/alpha 408 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep $timeout
+# after time out duration, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch success test case after 10s
+echo "=== Put policy: shall publish and consume success at least 10 secs later ==="
+req_id=$(get_random_number)
+RESULT=""
+# asynch callout
+do_curl PUT /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep 10
+# after 10s, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch error test case
+echo "=== Get policy status: shall publish and consume time-out ==="
+req_id=$(get_random_number)
+res=$(cat jsonfiles/timeout_response.json)
+RESULT="json:$res"
+# asynch callout
+do_curl GET /policytypes/STD_2/kafkadispatcher/alpha/status 408 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep $timeout
+# after time out duration, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch success test case after 10s
+echo "=== Get policy status: shall publish and consume success at least 15 secs later ==="
+req_id=$(get_random_number)
+RESULT=""
+# asynch callout
+do_curl GET /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep 15
+# after 15s, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch success test case without any delay
+echo "=== Delete policy: shall publish and consume success ==="
+req_id=$(get_random_number)
+RESULT=""
+# asynch callout
+do_curl DELETE /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Script to build and start the container
+# Make sure to run the simulator with the same arg as this script
+
+print_usage() {
+ echo "Usage: ./build_and_start.sh "
+ exit 1
+}
+
+if [ $# -ge 1 ]; then
+ print_usage
+fi
+
+echo "Building Kafka message dispatcher image..."
+cd ../KAFKA_DISPATCHER/
+
+#Build the image
+docker build -t kafka_dispatcher .
+
+docker stop kafkamessagedispatcher > /dev/null 2>&1
+docker rm -f kafkamessagedispatcher > /dev/null 2>&1
+
+echo "Starting Kafka message dispatcher..."
+echo "PWD path: "$PWD
+
+#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds
+docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
--- /dev/null
+{
+ "request_topic": "kafkatopicreq",
+ "response_topic": "kafkatopicres"
+}
--- /dev/null
+{
+
+ "title": "A1 policy external server",
+ "description": "A1 policies notifying external server",
+ "type": "object",
+ "properties": {
+ "a1policyType": "alpha_test_policy",
+ "url" : "http://www.com"
+ }
+
+}
--- /dev/null
+{
+
+ "title": "A1 policy external server",
+ "description": "A1 policies notifying external server",
+ "type": "object",
+ "properties": {
+ "a1policyType": "beta_test_policy",
+ "url" : "http://www.com"
+ }
+
+}
--- /dev/null
+{
+ "title": "Unknown",
+ "status": 500,
+ "detail": "Not implemented response code"
+}
--- /dev/null
+{
+ "title": "Request timeout",
+ "status": 408,
+ "detail": "Request timeout"
+}
--- /dev/null
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Script to build and start the container
+# Make sure to run the simulator with the same arg as this script
+
+print_usage() {
+ echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure|ignore-kafka-srv"
+ exit 1
+}
+
+if [ $# -ne 2 ]; then
+ print_usage
+fi
+
+if [ $1 == "duplicate-check" ]; then
+ DUP_CHECK=1
+elif [ $1 == "ignore-duplicate" ]; then
+ DUP_CHECK=0
+else
+ print_usage
+fi
+
+if [ $2 == "kafka-srv" ]; then
+ URL="http://localhost:7075"
+elif [ $2 == "kafka-srv-secure" ]; then
+ URL="https://localhost:7175"
+elif [ $2 == "ignore-kafka-srv" ]; then
+ URL=""
+else
+ print_usage
+fi
+
+URL_FLAG=""
+if [ ! -z "$URL" ]; then
+ URL_FLAG="-e KAFKA_DISPATCHER_URL=$URL"
+fi
+
+# Stop and remove container images if they run
+
+echo "Stopping A1 simulator image..."
+docker stop a1StdSimulator > /dev/null 2>&1
+docker rm -f a1StdSimulator > /dev/null 2>&1
+
+echo "Stopping kafka dispatcher server image..."
+docker stop kafkamessagedispatcher > /dev/null 2>&1
+docker rm -f kafkamessagedispatcher > /dev/null 2>&1
+
+# Initialize path variables for certificate and build operations
+
+dirstd2=$PWD
+
+cd ../../
+dirnrtsim=$PWD
+
+cd test/KAFKA_DISPATCHER/
+dirkafkasrv=$PWD
+
+# Build containers
+
+cd $dirnrtsim
+echo "Building A1 simulator image..."
+docker build -t a1test .
+
+if [ ! -z "$URL" ]; then
+ cd $dirkafkasrv
+ echo "Building kafka server image..."
+ docker build -t kafka_dispatcher .
+fi
+
+# Run containers
+
+# Runs kafka server in detached mode
+# In order to tail logs use:: docker logs -f kafkamessagedispatcher
+if [ ! -z "$URL" ]; then
+ docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e KAFKA_TOPIC_RES=kafkatopicres -e TIME_OUT=30000 --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
+fi
+
+# Runs A1 simulator
+docker run --network host --rm -it -p 8085:8085 -p 8185:8185 -e A1_VERSION=STD_2.0.0 -e ALLOW_HTTP=true -e REMOTE_HOSTS_LOGGING=1 -e DUPLICATE_CHECK=$DUP_CHECK $URL_FLAG --volume "$dirnrtsim/certificate:/usr/src/app/cert" --name a1StdSimulator a1test
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# This script publishes a response-event to a kafka bus
+# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
+# To instal kafka-python please use: pip install kafka-python
+# Example of an response-event json
+#{
+ #"response-code": "400",
+ #"error-info": "Bad format"
+#}
+
+
+import os
+import json
+import sys
+
+from kafka import KafkaProducer
+
+# Response string with JSON format
+response_data_JSON = """
+{
+ "response-code": 200,
+ "error-info": ""
+}
+"""
+
+# Instantiate KafkaProducer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
+def create_kafka_producer():
+
+ producer = KafkaProducer(
+ bootstrap_servers = ['localhost:9092'],
+ key_serializer = str.encode,
+ value_serializer = lambda m: json.dumps(m).encode('ascii'),
+ )
+ return producer
+
+# Helper: Publishes (to) the target broker and the topic in synch
+def publish(kafka_evet, req_id):
+
+ # Instantiate KafkaProducer with keyword arguments
+ producer = create_kafka_producer()
+ # Assigns an id to each request that is supposed to get a result
+ # req_id = 'Hll1EsycKLNRric7'
+
+ try:
+
+ # synch-publish
+ # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
+ fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id)
+ return fut_rec_metadata.get()
+
+ except Exception as err:
+ print('Error while publish', err)
+ finally:
+ producer.close()
+
+if __name__ == '__main__':
+ try:
+
+ requestid = sys.argv[1]
+ # response_data_JSON is str
+ future = publish(response_data_JSON, requestid)
+
+ if (future is not None):
+ print (0)
+ else:
+ print (1)
+
+ except Exception:
+ print (1)
+ sys.exit()
#!/bin/bash
# ============LICENSE_START===============================================
-# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# Copyright (C) 2020-2022 Nordix Foundation. All rights reserved.
# ========================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#Expects the env $RESULT to contain the expected RESULT.
#If json, the RESULT shall begin with 'json:'.
#Any json parameter with unknown value shall be given as "????" to skip checking the value.
+#The requestid parameter is being introduced in the fifth order.
do_curl() {
if [ $# -lt 3 ]; then
echo "Need 3 or more parameters, <http-operation> <url> <response-code> [file]: "$@
exit 1
fi
curlstr="curl -X "$1" -skw %{http_code} $HTTPX://localhost:"${PORT}${2}" -H accept:*/*"
- if [ $# -gt 3 ]; then
+ if [ $# -eq 4 ]; then
curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4
fi
+ if [ $# -ge 5 ]; then
+ curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4" -H requestid:"$5
+ fi
echo " CMD (${BASH_LINENO[0]}):"$curlstr
res=$($curlstr)
status=${res:${#res}-3}
fi
}
+# Triggers publish_event_to_kafka_bus.py script to send msg to Kafka broker
+# The aim of this function is to realize error related test cases only
+# The request_id for the Kafka msg, should be passed here as a function parameter
+publish_response_event() {
+ if [ $# -lt 1 ]; then
+ echo "Need 1 parameter, <request_id>"
+ echo "Exiting test script....."
+ exit 1
+ fi
+ res=$(python ../common/publish_response_event_to_kafka_bus.py "$1")
+ if [ $res -eq 0 ]; then
+ echo " Result as expected "
+ else
+ echo " Result not expected "
+ echo " Exiting..... "
+ exit 1
+ fi
+}
+
+# Creates 16 digits random number using letters and numbers only
+get_random_number() {
+ r_num=$(tr -dc A-Za-z0-9 < /dev/urandom | head -c 16)
+ echo $r_num
+}
+
+# It is being used to cross-test-cases in between A1 sim and external server
+# The parameter it holds all with regards to External Server relates e.g. HTTPX_EXT_SRV and PORT_EXT_SRV
do_curl_ext_srv() {
if [ $# -lt 3 ]; then
echo "Need 3 or more parameters, <http-operation> <url> <response-code> [file]: "$@