Kafka dispatcher module backbone 26/8626/17
authorhalil.cakal <halil.cakal@est.tech>
Tue, 21 Jun 2022 11:05:14 +0000 (12:05 +0100)
committerJohn Keeney <john.keeney@est.tech>
Fri, 29 Jul 2022 09:25:23 +0000 (09:25 +0000)
Patch set 2: Creates a Python-adapter dispatching HTTP/HTTPS requests to a Kafka broker using kafka-python library.
Patch set 3: Develops a consumer capable of deserializing JSON messages. Clean up redundant functions and parameters.
Patch set 4: Develops a feature that publishes and consumes (to/from) the Kafka cluster in two-way synch.
Patch set 5: Develops unit tests cases.
Patch set 6: Develops a feature capable of handling requestid sent in headers for error testing.
Patch set 7: Develops asynch error test cases.
Patch set 8: Develops configurable look-up talbe that maps policy-type to kafka-topic. Supports multi topic including unit tests cases with error based.
Patch set 9: Develops publish-consume kafka message for get-policy-status operation. Enriches unit test cases.
Patch set 10: Validate open API yaml and fixes review.
Patch set 11: Develops an actual kafka response processing including unit test-cases with error based.
Patch set 12: Develops configurable response topic to be consumed from, apply changes to policytypetotopicmapping API. Adds unit test case.
Patch set 13: Plugs in kafka dispatcher module to A1 Sim to realize e2e sequence.
Patch set 14: Develops error testing script in which timeout value is being parametric.
Patch set 15: Calculates poll cycle treshold and develops more validation on time out param.

Issue-ID: NONRTRIC-757
Change-Id: I9bd257691d8a50e81492c55a5ec17a4409d9da6c
Signed-off-by: halil.cakal <halil.cakal@est.tech>
26 files changed:
near-rt-ric-simulator/src/STD_2.0.0/a1.py [changed mode: 0644->0755]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh [new file with mode: 0755]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh [new file with mode: 0755]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh [new file with mode: 0755]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh [new file with mode: 0755]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json [new file with mode: 0644]
near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json [new file with mode: 0644]
near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh [new file with mode: 0755]
near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py [new file with mode: 0644]
near-rt-ric-simulator/test/common/test_common.sh

old mode 100644 (file)
new mode 100755 (executable)
index 28eccc6..e70a8ed
@@ -36,7 +36,7 @@ APPL_JSON='application/json'
 APPL_PROB_JSON='application/problem+json'
 
 EXT_SRV_URL=os.getenv('EXT_SRV_URL')
-
+KAFKA_DISPATCHER_URL=os.getenv('KAFKA_DISPATCHER_URL')
 
 # API Function: Get all policy type ids
 def get_all_policy_types():
@@ -132,6 +132,13 @@ def put_policy(policyTypeId, policyId):
       pjson=create_problem_json(None, "Duplicate, the policy json already exists.", 400, None, policy_id)
       return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
 
+  #Callout hooks for kafka dispatcher
+  if (KAFKA_DISPATCHER_URL is not None):
+    resp = callout_kafka_dispatcher(policy_type_id, policy_id, data, retcode)
+    if (resp != 200):
+      pjson=create_error_response(resp)
+      return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
+
   #Callout hooks for external server
   #When it fails, break and return 419 HTTP status code
   if (EXT_SRV_URL is not None):
@@ -203,6 +210,13 @@ def delete_policy(policyTypeId, policyId):
     pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
     return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
+  #Callout hooks for kafka dispatcher
+  if (KAFKA_DISPATCHER_URL is not None):
+    resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 204)
+    if (resp != 200):
+      pjson=create_error_response(resp)
+      return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
+
   #Callout hooks for external server
   #When it fails, break and return 419 HTTP status code
   if (EXT_SRV_URL is not None):
@@ -241,8 +255,45 @@ def get_policy_status(policyTypeId, policyId):
     pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
     return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
+  #Callout hooks for kafka dispatcher
+  if (KAFKA_DISPATCHER_URL is not None):
+    resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 202)
+    if (resp != 200):
+      pjson=create_error_response(resp)
+      return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
+
   return Response(json.dumps(policy_status[policy_id]), status=200, mimetype=APPL_JSON)
 
+
+# Helper: Callout kafka dispatcher server to notify it for policy operations
+def callout_kafka_dispatcher(policy_type_id, policy_id, payload, retcode):
+
+  target_url = KAFKA_DISPATCHER_URL + "/policytypes/" + policy_type_id + "/kafkadispatcher/" + policy_id
+  try:
+    # create operation, publish with payload
+    if (retcode == 201):
+      resp=requests.put(target_url, json=payload, timeout=30, verify=False)
+      return resp.status_code
+    # update operation, publish with payload
+    elif (retcode == 200):
+      # add headers an update-flag
+      headers = {'updateoper' : 'yes'}
+      resp=requests.put(target_url, json=payload, headers=headers, timeout=30, verify=False)
+      return resp.status_code
+    # delete operation, publish without payload
+    elif (retcode == 204):
+      resp=requests.delete(target_url, timeout=30, verify=False)
+      return resp.status_code
+    # get policy status operation, publish without payload
+    elif (retcode == 202):
+      # update endpoint
+      target_url = target_url + "/status"
+      resp=requests.get(target_url, timeout=30, verify=False)
+      return resp.status_code
+  except Exception:
+    return 419
+
+
 # Helper: Callout external server to notify it for policy operations
 # Returns 200, 201 and 204 for the success callout hooks, for the others returns 419
 def callout_external_server(policy_id, payload, operation):
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/Dockerfile
new file mode 100644 (file)
index 0000000..bc5d815
--- /dev/null
@@ -0,0 +1,49 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+FROM python:3.8-slim-buster
+
+RUN pip install connexion[swagger-ui]
+RUN pip install kafka-python
+
+#install nginx and curl
+RUN apt-get update && apt-get install -y nginx=1.14.* nginx-extras curl
+
+WORKDIR /usr/src/app
+
+COPY api api
+COPY nginx.conf nginx.conf
+COPY certificate /usr/src/app/cert
+COPY src src
+COPY resources resources
+
+ARG user=nonrtric
+ARG group=nonrtric
+
+RUN groupadd $user && \
+    useradd -r -g $group $user
+RUN chown -R $user:$group /usr/src/app
+RUN chown -R $user:$group /var/log/nginx
+RUN chown -R $user:$group /var/lib/nginx
+RUN chown -R $user:$group /etc/nginx/conf.d
+RUN touch /var/run/nginx.pid
+RUN chown -R $user:$group /var/run/nginx.pid
+
+USER ${user}
+
+RUN chmod +x src/start.sh
+CMD src/start.sh
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/api/KAFKA_DISPATCHER_api.yaml
new file mode 100644 (file)
index 0000000..5480fbe
--- /dev/null
@@ -0,0 +1,241 @@
+openapi: 3.0.0
+info:
+  title: 'Kafka message dispatcher for A1 interface'
+  version: 2.0.0
+  description: |
+    Kafka message dispatcher server.
+    © 2022, O-RAN Alliance.
+    All rights reserved.
+externalDocs:
+  description: 'RestFUL APIs that create and dispatch Kafka messages to Kafka brokers'
+  url: 'https://docs.o-ran-sc.org/projects/o-ran-sc-sim-a1-interface/en/latest/index.html'
+servers:
+  - url: '{apiRoot}'
+    variables:
+      apiRoot:
+        default: 'https://example.com'
+paths:
+  '/policytypetotopicmapping/{policyTypeId}':
+    parameters:
+      - name: policyTypeId
+        in: path
+        required: true
+        schema:
+          "$ref": "#/components/schemas/PolicyTypeId"
+    get:
+      operationId: dispatcher.get_policy_type_to_topic_mapping
+      description: 'Get the kafka request and response topic map corresponding to policy type'
+      tags:
+      - The mapping from policy type to kafka topic request and response object
+      responses:
+        200:
+          description: 'The policy type to topic map schemas'
+          content:
+            application/json:
+              schema:
+                "$ref": "#/components/schemas/PolicyTypeToTopicMap"
+        404:
+          "$ref": "#/components/responses/404-NotFound"
+        429:
+          "$ref": "#/components/responses/429-TooManyRequests"
+        503:
+          "$ref": "#/components/responses/503-ServiceUnavailable"
+
+  '/policytypes/{policyTypeId}/kafkadispatcher/{policyId}':
+    parameters:
+      - name: policyTypeId
+        in: path
+        required: true
+        schema:
+          "$ref": "#/components/schemas/PolicyTypeId"
+      - name: policyId
+        in: path
+        required: true
+        schema:
+          "$ref": "#/components/schemas/A1PolicyId"
+    put:
+      operationId: dispatcher.put_policy
+      description: 'Dispatch create and update operation as kafka message to kafka cluster'
+      tags:
+      - Individual policy Object
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              "$ref": "#/components/schemas/A1PolicyObject"
+      responses:
+        200:
+          description: 'Create or update operation dispatched'
+        400:
+          "$ref": "#/components/responses/400-BadRequest"
+        408:
+          "$ref": "#/components/responses/408-RequestTimeout"
+        419:
+          "$ref": "#/components/responses/419-KafkaMessagePublishFailed"
+        429:
+          "$ref": "#/components/responses/429-TooManyRequests"
+        503:
+          "$ref": "#/components/responses/503-ServiceUnavailable"
+        507:
+          "$ref": "#/components/responses/507-InsufficientStorage"
+    delete:
+      operationId: dispatcher.delete_policy
+      description: 'Dispatch policy delete opertion as kafka message to kafka cluster'
+      responses:
+        200:
+          description: 'Delete operation dispatched'
+        408:
+          "$ref": "#/components/responses/408-RequestTimeout"
+        419:
+          "$ref": "#/components/responses/419-KafkaMessagePublishFailed"
+        429:
+          "$ref": "#/components/responses/429-TooManyRequests"
+        503:
+          "$ref": "#/components/responses/503-ServiceUnavailable"
+
+  '/policytypes/{policyTypeId}/kafkadispatcher/{policyId}/status':
+    parameters:
+      - name: policyTypeId
+        in: path
+        required: true
+        schema:
+          "$ref": "#/components/schemas/PolicyTypeId"
+      - name: policyId
+        in: path
+        required: true
+        schema:
+          "$ref": "#/components/schemas/A1PolicyId"
+    get:
+      operationId: dispatcher.get_policy_status
+      description: 'Dispatch policy status query opertion as kafka message to kafka cluster'
+      tags:
+      - Individual A1 Policy Status Object
+      responses:
+        200:
+          description: 'Query policy status operation dispatched'
+        429:
+          "$ref": "#/components/responses/429-TooManyRequests"
+        503:
+          "$ref": "#/components/responses/503-ServiceUnavailable"
+
+components:
+  schemas:
+    #
+    # Representation objects
+    #
+    A1PolicyObject:
+      description: 'A generic policy object'
+      type: object
+
+    A1Policy:
+      description: 'A generic policy string'
+      type: string
+
+    PolicyTypeToTopicMap:
+      description: 'Request and response topic map for each policy type'
+      type: object
+      properties:
+        policy_type:
+          type: object
+          properties:
+            request_topic:
+              type: string
+              example: kafkatopicreq
+            response_topic:
+              type: string
+              example: kafkatopicres
+
+    ProblemDetails:
+      description: 'A problem detail to carry details in a HTTP response according to RFC 7807'
+      type: object
+      properties:
+        type:
+          type: string
+        title:
+          type: string
+        status:
+          type: number
+        detail:
+          type: string
+        instance:
+          type: string
+
+    #
+    # Simple data types
+    #
+    JsonSchema:
+      description: 'A JSON schema following http://json-schema.org/draft-07/schema'
+      type: object
+
+    A1PolicyId:
+      description: 'A1 policy identifier.'
+      type: string
+
+    PolicyTypeId:
+      description: 'Policy type identifier assigned by the A1-P Provider'
+      type: string
+
+  responses:
+    400-BadRequest:
+      description: 'A1 policy not properly formulated or not related to the method'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    404-NotFound:
+      description: 'No resource found at the URI'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    405-MethodNotAllowed:
+      description: 'Method not allowed for the URI'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    408-RequestTimeout:
+      description: 'Request could not be processed in given amount of time'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    409-Conflict:
+      description: 'Request could not be processed in the current state of the resource'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    419-KafkaMessagePublishFailed:
+      description: 'Publishing the kafka message to the broker gets fail'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    429-TooManyRequests:
+      description: 'Too many requests have been sent in a given amount of time'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    503-ServiceUnavailable:
+      description: 'The provider is currently unable to handle the request due to a temporary overload'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
+
+    507-InsufficientStorage:
+      description: 'The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request'
+      content:
+        application/problem+json:
+          schema:
+            "$ref": "#/components/schemas/ProblemDetails"
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/cert.crt
new file mode 100644 (file)
index 0000000..6408f33
--- /dev/null
@@ -0,0 +1,24 @@
+-----BEGIN CERTIFICATE-----
+MIID+zCCAuOgAwIBAgIUWy7JHRvA2GfOU5op4Xcc/8wQF18wDQYJKoZIhvcNAQEL
+BQAwgYwxCzAJBgNVBAYTAklFMRIwEAYDVQQIDAlXRVNUTUVBVEgxEDAOBgNVBAcM
+B0FUSExPTkUxETAPBgNVBAoMCEVyaWNzc29uMQwwCgYDVQQLDANFU1QxETAPBgNV
+BAMMCGVzdC50ZWNoMSMwIQYJKoZIhvcNAQkBFhRoYWxpbC5jYWthbEBlc3QudGVj
+aDAeFw0yMjA2MTcwOTEwMDNaFw00OTExMDEwOTEwMDNaMIGMMQswCQYDVQQGEwJJ
+RTESMBAGA1UECAwJV0VTVE1FQVRIMRAwDgYDVQQHDAdBVEhMT05FMREwDwYDVQQK
+DAhFcmljc3NvbjEMMAoGA1UECwwDRVNUMREwDwYDVQQDDAhlc3QudGVjaDEjMCEG
+CSqGSIb3DQEJARYUaGFsaWwuY2FrYWxAZXN0LnRlY2gwggEiMA0GCSqGSIb3DQEB
+AQUAA4IBDwAwggEKAoIBAQCqwDVZ7txWX/FaiRiSVa2jnBcV7KN6eqwcKtP3cNP+
+3VTm4YtcY6yp/dPXTYqkAX1qmp5i8USFPnbCstAijI5Uy8kl63dYbirHMPwt9AOL
+TXrFRrJ/sev3ULJWKB1IOGt2rFhoUXA23Hv1hagyvjx2upbnVmhrz5qBOT1wuzwN
+U2PjFaCFHBs0XphFS/UDEQlvpbNz/jxwHVrEdO8Jr951OFlUBczDDGk0jJ3hRc0p
+iM5LNGH02yDvE6pCqqY5Fo5aaj9Vi0Kztv1D/NClWcr3Yh3IuMyZkfS+S8nPd7Nu
+VHKWo7cPv9QpeziWiqx0fZcSAh6tUF52hrwGrMONuEojAgMBAAGjUzBRMB0GA1Ud
+DgQWBBRiX8NchgUa825PcmhA0b+BOnkx5TAfBgNVHSMEGDAWgBRiX8NchgUa825P
+cmhA0b+BOnkx5TAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAi
+p6UMfLeTROQJNS4ZAkpQKEtrhGaE8DpnZW+tCLWaxkTk3Mo3ybE3rBW2zugmXFIU
+K2PRKivxHmVxY/YQL3sxzQRnzRPl0wuEmm+r0LpGR6VXfTaPrT6YzKM0TU/xKqJI
+INk6JiBx8v9dp5LWGoJEs0e4wvoV8Bd05u+73cbhIUisO7KmCp/u1D1BHYtBNDCp
+sVH6y9mAlvMIAOzy4wOWqoAxcW2ES8JbesbLOxt/IaQO9DQFPUIjTZURG+62lNCS
++2+lb1ihNqzEECuyw1GQRt88HrSWuj99bCBRRiBij900anadoKIDHdWSEkzhBa0K
+zJ5KoQK/o4szZ3VPByfh
+-----END CERTIFICATE-----
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/generate_cert_and_key.sh
new file mode 100755 (executable)
index 0000000..7e6d29c
--- /dev/null
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# This will generate a self-signed certificate with password 'test'
+
+SUBJECT="/C=IE/ST=WESTMEATH/L=ATHLONE/O=Ericsson/OU=EST/CN=est.tech/emailAddress=halil.cakal@est.tech"
+PW=test
+echo $PW > pass
+
+openssl req -x509 -passout file:pass -newkey rsa:2048 -keyout key.crt -subj "$SUBJECT" -out cert.crt -days 9999
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/key.crt
new file mode 100644 (file)
index 0000000..9f81115
--- /dev/null
@@ -0,0 +1,30 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQI/PWiKXnGNAMCAggA
+MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECPdSVeuyJFIwBIIEyGmpW5lUwpKF
+tlcwBj2WvKF/7GJFw9LEuIBnkm0Ya8LV5Fu5XS9OGg877OkLVr1P152YjDQ+h1nD
+ZZSOEqjAGGSlEshie2DSr9bdJdBlr8MTog7SlHQsrU0QLxPRBBwqseI5cyu1Vb41
+4V5+l0iWUoDvXH3mUeH460A43GU4ZUTimkWpP9M9LHyrofjMg7sLzwvOns207DXK
+0hffc5293t85CQPau1WenHsfXn65tkPFblYILpJyU4sa1kLa4f5Ynt7RH2mNztfI
+26kViIuOBDXcmLAxL3WGZaR9u71qpl0wC1umXWxWNt69DRVyOf23mHHSuuDSEyM3
++x0rrbj/QaLNnoEjAFvijEAkYdp/jPKKP3kp6LpSVvVVOGLP7srrIrOe4q6bPFfK
+d5u1Vnh3/PVEf8xPbJe8UJ5cfx3mWhT+saZOIpEpQvom5GwSYt869P1xyAaa59cx
+TcT9KC/Ytg7fSHwXwTclIvucD+cJvbEZNFAwxMkL94a60LNfZ/odq1Bu304Shm/V
+DSNeDi1HjfoC3aca7bjsXE8Xj82JQLaSGt7+AuY3gICA5cnJxxWv5VoyRZVPsiRj
+Z6ykP2ikjkaLQaqDJmpbnx2ZK/lfrkJI1yI0kYK0xApUQdx9ks8c/AeEcUby2z+H
+qPJZuuh1NlEv8jSFn1CO3a5Bpq53EtQlxonKzJYHdoKm1oIEbIUE60K+1oxyXt9S
++l8PgH0T2QlM2lvipy5XegPrTuMYqDywEt4cf1Yk/8RSYEeLzcfKzSuNuy556YRd
+Of9nJOKPkVr1cp5si9Vyt+t4cD826WWV1ZgEcIK0i8uhEQxHb+y88DDWAV1DJGog
+M0qPGm95lWj4ESiv7A/+AbXY3rJRMp/JB1jmGTNfa3jQ9P29cTwhlpvPnhqviLRH
+1YfIOJfghrF55e83bLmJcfwVktMwO8Wzovw7U/1Rzy1j/fyUNkhOEbziIu97ScD+
+7AQDfRqBZZXWREOylAmfWMqZxwVn+CytAFyTLP5CpjSKgBV+qJ5xc3GqNJxaKGK5
+ULXfJKtTheJV3YYnvCuDySFuCv/dkUVjaA3/TqTikKm4THxS+DjtFJey+KWTx57p
+X+8ky/E3zAuZzP7r6Dszhp+CAfvXp4CwkLitqfbwja2lcxQ+hbpHYjLN6zvHmmOZ
+o0ZeoNpsWj00jAc9NrJt08DfcNomlUz9CZgpvE64gXX7wPCw5zG/c5dbFuoImEeI
+h+fKwGh2KdJwazvZ/B20/TRUn1WaQzyRiCJMIgc3aO3AiuffeurliO93iTeVCjJD
+d+Bdt4Ub0zPRikTqY4PUDiyfM+vRHOkJ39dY39XaZEsLLXB6XeACWcWT6iBF56Pe
+UAi+C4IwPLBHMsIVVSvH8/Rg0IFBHVmkGIi0q7gIyVTcR7FQeCzDZVAjJn4aBgXo
+S57v2AbMdD3pwie4ou91NkiM/SnSimLA0BxEh1UEhZk2BEW2Yy6OuvcLn3EfeZVu
+M9UNmwIFaq9jWM/qcRQa7MbXTZUTUIPsOvMOsZwPIurqtXWZZTWZ8D0eu2Hu6Vui
+FGyY1xIVcIpGXesADsYwy+CSW/EjpV1s+/LGDsUcqZpMeWmj84Zh1Tjt+Fet+967
+dSViTwISZ+O8F2uq0MaiPg==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/certificate/pass
new file mode 100644 (file)
index 0000000..9daeafb
--- /dev/null
@@ -0,0 +1 @@
+test
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/nginx.conf
new file mode 100644 (file)
index 0000000..8de906d
--- /dev/null
@@ -0,0 +1,93 @@
+# user www-data;
+worker_processes auto;
+pid /run/nginx.pid;
+include /etc/nginx/modules-enabled/*.conf;
+
+env ALLOW_HTTP;
+
+events {
+    worker_connections 768;
+    # multi_accept on;
+}
+
+http {
+
+    ##
+    # Basic Settings
+    ##
+
+    sendfile on;
+    tcp_nopush on;
+    tcp_nodelay on;
+    keepalive_timeout 65;
+    types_hash_max_size 2048;
+    # server_tokens off;
+
+    # server_names_hash_bucket_size 64;
+    # server_name_in_redirect off;
+
+    include /etc/nginx/mime.types;
+    default_type application/octet-stream;
+
+    perl_set $allow_http 'sub { return $ENV{"ALLOW_HTTP"}; }';
+
+    server { # simple reverse-proxy
+       listen      7075;
+        listen      [::]:7075;
+        server_name  localhost;
+       if ($allow_http != true) {
+           return 444;
+       }
+
+       # serve dynamic requests
+        location / {
+            proxy_set_header   Host                 $host;
+            proxy_set_header   X-Real-IP            $remote_addr;
+            proxy_set_header   X-Forwarded-For      $proxy_add_x_forwarded_for;
+            proxy_pass      http://localhost:7777;
+        }
+    }
+
+    server { # simple reverse-proxy
+        listen      7175 ssl;
+        listen      [::]:7175 ssl;
+        server_name  localhost;
+        ssl_certificate     /usr/src/app/cert/cert.crt;
+        ssl_certificate_key /usr/src/app/cert/key.crt;
+        ssl_password_file   /usr/src/app/cert/pass;
+
+        # serve dynamic requests
+        location / {
+            proxy_set_header   Host                 $host;
+            proxy_set_header   X-Real-IP            $remote_addr;
+            proxy_set_header   X-Forwarded-For      $proxy_add_x_forwarded_for;
+            proxy_pass      http://localhost:7777;
+        }
+    }
+    ##
+    # SSL Settings
+    ##
+
+    ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # Dropping SSLv3, ref: POODLE
+    ssl_prefer_server_ciphers on;
+
+    ##
+    # Logging Settings
+    ##
+
+    access_log /var/log/nginx/access.log;
+    error_log /var/log/nginx/error.log;
+
+    ##
+    # Gzip Settings
+    ##
+
+    gzip on;
+
+    # gzip_vary on;
+    # gzip_proxied any;
+    # gzip_comp_level 6;
+    # gzip_buffers 16 8k;
+    # gzip_http_version 1.1;
+    # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
+}
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/resources/policytype_to_topicmap.json
new file mode 100644 (file)
index 0000000..254cb4d
--- /dev/null
@@ -0,0 +1,14 @@
+{
+  "ANR": {
+    "request_topic": "kafkatopicreq",
+    "response_topic": "kafkatopicres"
+  },
+  "STD_1": {
+    "request_topic": "kafkatopicreq2",
+    "response_topic": "kafkatopicres"
+  },
+  "STD_2": {
+    "request_topic": "kafkatopicreq3",
+    "response_topic": "kafkatopicres"
+  }
+}
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/dispatcher.py
new file mode 100644 (file)
index 0000000..19e2ab6
--- /dev/null
@@ -0,0 +1,335 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+import os
+import json
+import time
+import math
+
+from flask import request, Response
+from datetime import datetime
+from kafka.consumer.fetcher import ConsumerRecord
+from kafka import TopicPartition
+from var_declaration import forced_settings
+from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string
+
+
+MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
+
+TIME_OUT=os.getenv('TIME_OUT')
+
+#Constsants
+APPL_JSON='application/json'
+TEXT_PLAIN='text/plain'
+APPL_PROB_JSON='application/problem+json'
+
+# API Function: Dispatch create or update events to Kafka cluster
+def put_policy(policyTypeId, policyId):
+
+  if ((r := check_modified_response()) is not None):
+    return r
+
+  policy_type_id = str(policyTypeId)
+  policy_id = str(policyId)
+
+  try:
+    # Error based unit test rel only, for more info please check basic_test_with_cust_header
+    req_id_from_header = request.headers.get('requestid')
+    # Differentiate if the PUT is update or create operation since the endpoint is the same
+    update_oper_from_header = request.headers.get('updateoper')
+    data = request.data
+    data = json.loads(data)
+  except Exception:
+    pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id)
+    return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
+
+  # Decide if the operation is update or create
+  if (update_oper_from_header is not None):
+    kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE')
+  else:
+    kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE')
+
+  # Synch callout hooks towards kafka broker
+  if (MSG_BROKER_URL is not None):
+    return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
+
+  return Response('', 200, mimetype=TEXT_PLAIN)
+
+
+# API Function: Dispatch delete events to south Kafka cluster
+def delete_policy(policyTypeId, policyId):
+
+  if ((r := check_modified_response()) is not None):
+    return r
+
+  policy_type_id = str(policyTypeId)
+  policy_id = str(policyId)
+
+  req_id_from_header = request.headers.get('requestid')
+  print('req_id_from_header', req_id_from_header)
+
+  # Synch callout hooks towards kafka broker
+  kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE')
+  if (MSG_BROKER_URL is not None):
+    return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
+
+  return Response('', 200, mimetype=TEXT_PLAIN)
+
+
+# API Function: Get status for a policy
+def get_policy_status(policyTypeId, policyId):
+
+  if ((r := check_modified_response()) is not None):
+    return r
+
+  policy_type_id=str(policyTypeId)
+  policy_id=str(policyId)
+
+  req_id_from_header = request.headers.get('requestid')
+  print('req_id_from_header', req_id_from_header)
+
+  # Synch callout hooks towards kafka broker
+  kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET')
+  if (MSG_BROKER_URL is not None):
+    return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
+
+  return Response('', 200, mimetype=TEXT_PLAIN)
+
+
+def get_policy_type_to_topic_mapping(policyTypeId):
+
+  if ((r := check_modified_response()) is not None):
+    return r
+
+  policy_type_id = str(policyTypeId)
+
+  m_file = open('../resources/policytype_to_topicmap.json')
+  map_in_dict = json.load(m_file)
+
+  if policy_type_id in map_in_dict.keys():
+    topic_address = map_in_dict[policy_type_id]
+    return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON)
+  else:
+    pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id)
+    return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
+
+
+# Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch
+def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
+
+  # Instantiate KafkaProducer with keyword arguments
+  producer = create_kafka_producer()
+
+  # Assigns an id to each request that is supposed to get a result
+  # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only
+  if (req_id_from_header is None):
+    req_id = get_random_string(16)
+  else:
+    req_id = req_id_from_header
+
+  try:
+
+    resp = get_policy_type_to_topic_mapping(pol_type_id)
+    # if the policy type to topic mapping could not be found, then returns 404
+    # else gets target topic to publish the message to
+    if (resp.status_code == 404):
+      return resp
+    else:
+      data = json.loads(resp.data)
+      target_topic_req = data['request_topic']
+      target_topic_res = data['response_topic']
+
+    # synch-publish
+    # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
+    fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id)
+    record_metadata = fut_rec_metadata.get()
+    print('Future:', record_metadata)
+    publish_time_in_ms = record_metadata.timestamp
+
+    # For test purposes only, publish the success response event with no error-info to response topic
+    # if basic_test_with_cust_header.sh is being used, then comment this line
+    # else comment out this line for the basic_test.sh
+    kafka_response_event = create_kafka_response_event(200, "")
+    producer.send(target_topic_res, kafka_response_event, req_id)
+
+    # synch-consume
+    consumer_record = consume_record_for(req_id, target_topic_res)
+    if (isinstance(consumer_record, ConsumerRecord)):
+
+      print("Consumer Record:", consumer_record)
+      cons_rec_value = consumer_record.value
+      cons_rec_val_in_dict = json.loads(cons_rec_value)
+      resp_code = cons_rec_val_in_dict['response-code']
+
+      # if response code success, then check for time-out
+      if (int(resp_code) == 200):
+        # time-out control block, default time-out duration is thirty seconds
+        consume_time_in_ms = consumer_record.timestamp
+        elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms
+        print('Elapsed time in ms:', elapsed_time_in_ms)
+        if (elapsed_time_in_ms < int(TIME_OUT) * 1000):
+          return Response('', 200, mimetype=APPL_JSON)
+        else:
+          # returns time-out response code
+          pjson=create_error_response(408)
+          return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
+      else:
+        # for all other responses returns special error of this module by wrapping actual resp code
+        pjson=create_error_response(419)
+        return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+
+    elif (isinstance(consumer_record, Response)):
+      # Returns time-out response
+      return consumer_record
+    else:
+      # returns special error of this module
+      pjson=create_error_response(419)
+      return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+
+  except Exception as err:
+    print('Error while publish and consume', err)
+    pjson=create_error_response(419)
+    return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+  finally:
+    producer.close()
+
+
+# Helper: Searches for req_id by seeking every five seconds up to thirty seconds
+# Helper: If the req_id is found, then ConsumerRecord will be returned
+# Helper: If the req_id is not found, then Response Request Timeout will be returned
+def consume_record_for(req_id, target_topic_res):
+
+  try:
+    print ('req_id looking for in consumer:', req_id)
+    consumer = create_kafka_consumer()
+    topic_partition = TopicPartition(target_topic_res, 0)
+    consumer.assign([topic_partition])
+
+    # calculates poll cycle threshold
+    sleep_period_in_sec = 5
+    poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
+    poll_cycle_threshold = math.floor(poll_cycle_threshold)
+    print('poll_cycle_threshold', poll_cycle_threshold)
+
+    poll_retries = 0
+    starting_offset = 0
+    prev_last_offset = 0
+    while (poll_retries < poll_cycle_threshold):
+      # Manually specify the fetch offset for a TopicPartition
+      consumer.seek(topic_partition, starting_offset)
+      # Get the last offset for the given partitions
+      last_offset = consumer.end_offsets([topic_partition])[topic_partition]
+      print('last_offset',last_offset)
+
+      if (last_offset != prev_last_offset):
+        for consumer_record in consumer:
+          # Get req_id as msg_key and converts it from byte to str for each consumer record
+          msg_key = byte_to_str(consumer_record.key)
+          print ('msg_key in a consumer_record:', msg_key)
+          if (req_id == msg_key):
+            print ('req_id is found in consumer records', req_id)
+            return consumer_record
+          elif (consumer_record.offset == last_offset - 1):
+            break
+
+      print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
+      time.sleep(sleep_period_in_sec)
+
+      poll_retries += 1
+      prev_last_offset = last_offset
+      starting_offset += last_offset
+
+    # Returns time-out response
+    pjson=create_error_response(408)
+    return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
+
+  except Exception as err:
+    print('Error while consume record for req_id', err)
+    pjson=create_error_response(419)
+    return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
+  finally:
+    consumer.close()
+
+
+# Helper: Create a response object if forced http response code is set
+def get_forced_response():
+
+  if (forced_settings['code'] is not None):
+    resp_code=forced_settings['code']
+    pjson=create_error_response(int(resp_code))
+    return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
+  return None
+
+
+# Helper: Delay if delayed response code is set
+def do_delay():
+
+  if (forced_settings['delay'] is not None):
+    try:
+      val=int(forced_settings['delay'])
+      time.sleep(val)
+    except Exception:
+      return
+
+
+# Helper: Check if response shall be delayed or a forced response shall be sent
+def check_modified_response():
+
+  do_delay()
+  return get_forced_response()
+
+
+# Helper: Create a problem json object
+def create_problem_json(type_of, title, status, detail, instance):
+
+  error = {}
+  if type_of is not None:
+    error["type"] = type_of
+  if title is not None:
+    error["title"] = title
+  if status is not None:
+    error["status"] = status
+  if detail is not None:
+    error["detail"] = detail
+  if instance is not None:
+    error["instance"] = instance
+  return error
+
+
+# Helper: Create a problem json based on a generic http response code
+def create_error_response(code):
+
+    if code == 400:
+      return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
+    elif code == 404:
+      return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
+    elif code == 405:
+      return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
+    elif code == 408:
+      return(create_problem_json(None, "Request timeout", 408, "Request timeout", None))
+    elif code == 409:
+      return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
+    elif (code == 419):
+      return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None))
+    elif code == 429:
+      return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
+    elif code == 507:
+      return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
+    elif code == 503:
+      return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
+    else:
+      return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/main.py
new file mode 100644 (file)
index 0000000..7816e7f
--- /dev/null
@@ -0,0 +1,77 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+import json
+import sys
+import requests
+
+
+from flask import request, Response, Flask, json
+from var_declaration import forced_settings, app
+from maincommon import check_timeout, check_apipath
+
+#Constants
+TEXT_PLAIN='text/plain'
+
+check_apipath()
+check_timeout()
+
+# app is created in var_declarations
+
+import payload_logging   # app var need to be initialized
+
+#Check alive function
+@app.route('/', methods=['GET'])
+def test():
+  return Response("OK", 200, mimetype=TEXT_PLAIN)
+
+#Set|Reset force response to be returned from dispatcher
+#/dispatcheradmin/forceresponse?code=<responsecode>
+@app.route('/dispatcheradmin/forceresponse', methods=['POST'])
+def forceresponse():
+
+  query_param=request.args.get('code')
+  forced_settings['code']=query_param
+
+  if (query_param is None):
+    return Response("Force response code has been resetted for dispatcher responses", 200, mimetype=TEXT_PLAIN)
+  else:
+    return Response("Force response code: " + str(forced_settings['code']) + " set for all dispatcher response until it is resetted", 200, mimetype=TEXT_PLAIN)
+
+#Set|Reset force delay response, in seconds, for all external server responses
+#/a1policy/forcedelay?delay=<seconds>
+@app.route('/dispatcheradmin/forcedelay', methods=['POST'])
+def forcedelay():
+
+  query_param=request.args.get('delay')
+  forced_settings['delay']=query_param
+
+  if (query_param is None):
+    return Response("Force delay has been resetted for all dispatcher responses ", 200, mimetype=TEXT_PLAIN)
+  else:
+    return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all dispatcher responses until it is resetted ", 200, mimetype=TEXT_PLAIN)
+
+port_number = 7777
+if len(sys.argv) >= 2:
+  if isinstance(sys.argv[1], int):
+    port_number = sys.argv[1]
+
+#Import base RestFUL API functions from Open API
+app.add_api('KAFKA_DISPATCHER_api.yaml')
+
+if __name__ == '__main__':
+  app.run(port=port_number, host="127.0.0.1", threaded=False)
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/maincommon.py
new file mode 100644 (file)
index 0000000..c534acb
--- /dev/null
@@ -0,0 +1,119 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+import os
+import sys
+import json
+from pathlib import Path
+from flask import Response
+import socket
+import ssl
+import random
+import string
+
+from kafka import KafkaProducer, KafkaConsumer
+
+#Must exist
+apipath=os.environ['APIPATH']
+timeout=os.getenv('TIME_OUT')
+
+MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
+
+
+# Make sure the  api path is set, otherwise exit
+def check_apipath():
+    if (apipath is None):
+      print("Env APIPATH not set. Exiting....")
+      sys.exit(1)
+
+# Make sure the  timeout is set and greater than zero, otherwise exit
+def check_timeout():
+    if (timeout is None):
+      print("Env TIME_OUT not set. Exiting....")
+      sys.exit(1)
+    elif (int(timeout) < 0):
+      print("Env TIME_OUT must be greater than zero. Exiting....")
+      sys.exit(1)
+
+# Instantiate KafkaProducer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
+def create_kafka_producer():
+
+  producer = KafkaProducer(
+    bootstrap_servers = [MSG_BROKER_URL],
+    key_serializer = str.encode,
+    value_serializer = lambda m: json.dumps(m).encode('ascii'),
+  )
+  return producer
+
+
+# Instantiate KafkaConsumer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
+def create_kafka_consumer():
+  consumer = KafkaConsumer(
+    #KAFKA_TOPIC_RES,
+    bootstrap_servers = MSG_BROKER_URL,
+    auto_offset_reset = 'earliest',
+    value_deserializer = lambda m: json.loads(m.decode('ascii')),
+    #enable_auto_commit=False
+  )
+  return consumer
+
+
+# Helper: Builds a Kafka event
+def create_kafka_event(policy_type_id, policy_id, payload, operation):
+
+  kafka_event_format = {'action': operation_to_action(operation), 'payload': payload, 'policy_type_id': policy_type_id, 'policy_id': policy_id}
+  # converts dict to str
+  kafka_event_json = json.dumps(kafka_event_format)
+  return kafka_event_json
+
+# Helper: Builds a Kafka event
+def create_kafka_response_event(response_code, error_info):
+
+  kafka_response_event_format = {'response-code': response_code, 'error-info': error_info}
+  # converts dict to str
+  kafka_response_event_json = json.dumps(kafka_response_event_format)
+  return kafka_response_event_json
+
+# Helper: Converts a HTTP operation to an explanation
+def operation_to_action(argument):
+
+  switcher = {
+    'CREATE': "CreatePolicy",
+    'UPDATE': "UpdatePolicy",
+    'DELETE': "DeletePolicy",
+    'GET': "GetPolicyStatus",
+  }
+  return switcher.get(argument, None)
+
+
+# Helper: Converts a byte array to a str
+def byte_to_str(byte_arr):
+
+  if (byte_arr is not None):
+    return byte_arr.decode('utf-8')
+  else:
+    return None
+
+
+# Helper: Creates random string
+def get_random_string(length):
+
+  characters = string.ascii_letters + string.digits + string.punctuation
+  password = ''.join(random.choice(characters) for i in range(length))
+  return password
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/payload_logging.py
new file mode 100644 (file)
index 0000000..9457d04
--- /dev/null
@@ -0,0 +1,60 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+from var_declaration import app
+from flask import Flask, request, Response
+
+#Constants
+TEXT_PLAIN='text/plain'
+
+#Vars
+payload_log=True
+
+#Function to activate/deactivate http header and payload logging
+@app.route('/payload_logging/<state>', methods=['POST', 'PUT'])
+def set_payload_logging(state):
+  global payload_log
+  if (state == "on"):
+    payload_log=True
+  elif (state == "off"):
+    payload_log=False
+  else:
+    return Response("Unknown state: "+state+" - use 'on' or 'off'", 400, mimetype=TEXT_PLAIN)
+
+  return Response("Payload and header logging set to: "+state, 200, mimetype=TEXT_PLAIN)
+
+# Generic function to log http header and payload - called before the request
+@app.app.before_request
+def log_request_info():
+    if (payload_log is True):
+        print('')
+        print('-----Request-----')
+        print('Req Headers: ', request.headers)
+        print('Req Body: ', request.get_data())
+
+# Generic function to log http header and payload - called after the response
+@app.app.after_request
+def log_response_info(response):
+    if (payload_log is True):
+        print('-----Response-----')
+        print('Resp Headers: ', response.headers)
+        print('Resp Body: ', response.get_data())
+    return response
+
+# Helper function to check loggin state
+def is_payload_logging():
+  return payload_log
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/start.sh
new file mode 100644 (file)
index 0000000..e4e3510
--- /dev/null
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+#Set path to open api
+export APIPATH=$PWD/api
+echo "APIPATH set to: "$APIPATH
+
+cd src
+
+#start nginx
+nginx -c /usr/src/app/nginx.conf
+
+#start Kafka message dispatcher
+echo "Path to main.py: "$PWD
+python -u main.py
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py b/near-rt-ric-simulator/test/KAFKA_DISPATCHER/src/var_declaration.py
new file mode 100644 (file)
index 0000000..e6063b4
--- /dev/null
@@ -0,0 +1,26 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+from maincommon import apipath
+import connexion
+
+#Main app
+app = connexion.App(__name__, specification_dir=apipath)
+
+forced_settings = {}
+forced_settings['code']=None
+forced_settings['delay']=None
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test.sh
new file mode 100755 (executable)
index 0000000..ef7014d
--- /dev/null
@@ -0,0 +1,126 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Script for basic test of the Kafka message dispatcher.
+# Run the build_and_start with the same arg, except arg 'nonsecure|secure', as this script
+
+print_usage() {
+    echo "Usage: ./basic_test.sh nonsecure|secure "
+    exit 1
+}
+
+if [ $# -ne 1 ]; then
+    print_usage
+fi
+if [ "$1" != "nonsecure" ] && [ "$1" != "secure" ]; then
+    print_usage
+fi
+
+if [ $1 == "nonsecure" ]; then
+    #Default http port for the simulator
+    PORT=7075
+    # Set http protocol
+    HTTPX="http"
+else
+    #Default https port for the simulator
+    PORT=7175
+    # Set https protocol
+    HTTPX="https"
+fi
+
+. ../common/test_common.sh
+. ../common/elapse_time_curl.sh
+
+echo "=== Kafka message dispatcher hello world ==="
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== Reset force delay ==="
+RESULT="Force delay has been resetted for all dispatcher responses"
+do_curl POST /dispatcheradmin/forcedelay 200
+
+echo "=== API: Get policy type to topic mapping of type: ANR ==="
+res=$(cat jsonfiles/ANR_to_topic_map.json)
+RESULT="json:$res"
+do_curl GET /policytypetotopicmapping/ANR 200
+
+echo "=== Put policy: shall publish and consume for put policy operation ==="
+RESULT=""
+do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+
+echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+RESULT=""
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+
+echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+RESULT=""
+do_curl PUT  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+
+echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+RESULT=""
+do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200
+
+echo "=== Set force delay 5 sec ==="
+RESULT="Force delay: 5 sec set for all dispatcher responses until it is resetted"
+do_curl POST '/dispatcheradmin/forcedelay?delay=5' 200
+
+echo "=== Put policy: shall wait at least <delay-time> sec and then respond while publishing and consuming ==="
+RESULT=""
+do_elapsetime_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json 5
+
+echo "=== Reset force delay ==="
+RESULT="Force delay has been resetted for all dispatcher responses"
+do_curl POST /dispatcheradmin/forcedelay 200
+
+echo "=== Put policy: shall publish and consume for put policy operation for beta ==="
+RESULT=""
+do_curl PUT  /policytypes/STD_1/kafkadispatcher/beta 200 jsonfiles/beta_policy.json
+
+echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+RESULT=""
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/beta_policy.json
+
+echo "=== Put policy: shall publish and consume for put policy operation for alpha ==="
+RESULT=""
+do_curl PUT  /policytypes/STD_2/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json
+
+echo "=== Set force response code: 500 ==="
+RESULT="Force response code: 500 set for all dispatcher response until it is resetted"
+do_curl POST  '/dispatcheradmin/forceresponse?code=500' 200
+
+echo "=== Put policy: shall not publish and consume for put policy operation for alpha ==="
+res=$(cat jsonfiles/forced_response.json)
+RESULT="json:$res"
+do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 500 jsonfiles/alpha_policy.json
+
+echo "=== Reset force response code ==="
+RESULT="Force response code has been resetted for dispatcher responses"
+do_curl POST  /dispatcheradmin/forceresponse 200
+
+echo "=== Get policy status: shall publish and consume for get policy status operation ==="
+RESULT=""
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json
+
+echo "=== Delete policy: shall publish and consume for delete policy operation for alpha ==="
+RESULT=""
+do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/basic_test_with_cust_header.sh
new file mode 100755 (executable)
index 0000000..4fa8ac0
--- /dev/null
@@ -0,0 +1,139 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Script for error testing of the Kafka message dispatcher
+# The timeout value should be equal to TIME_OUT param that exist in the start script
+# Run the script with the args: nonsecure|secure timeout=30
+
+print_usage() {
+    echo "Usage: ./basic_test.sh nonsecure|secure timeout=30"
+    exit 1
+}
+
+if [ $# -ne 2 ]; then
+    print_usage
+fi
+if [ "$1" != "nonsecure" ] && [ "$1" != "secure" ]; then
+    print_usage
+fi
+
+timeout=$(echo "$2" | cut -d'=' -f2)
+regexp_for_number='^[0-9]+$'
+
+if ! [[ $timeout =~ $regexp_for_number ]] ; then
+   echo "error:"$timeout" Not a number"
+   exit 1
+else
+    if [ $timeout -le 0 ]; then
+        echo "Timeout value must be greater than zero"
+        exit 1
+    fi
+fi
+
+if [ $1 == "nonsecure" ]; then
+    # Default http port for the simulator
+    PORT=7075
+    # Set http protocol
+    HTTPX="http"
+else
+    #Default https port for the simulator
+    PORT=7175
+    # Set https protocol
+    HTTPX="https"
+fi
+
+. ../common/test_common.sh
+
+echo "=== Kafka message dispatcher hello world ==="
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== Reset force delay ==="
+RESULT="Force delay has been resetted for all dispatcher responses"
+do_curl POST /dispatcheradmin/forcedelay 200
+
+# asynch error test case
+echo "=== Put policy: shall publish and consume time-out ==="
+req_id=$(get_random_number)
+res=$(cat jsonfiles/timeout_response.json)
+RESULT="json:$res"
+# asynch callout
+do_curl PUT  /policytypes/ANR/kafkadispatcher/alpha 408 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep $timeout
+# after time out duration, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch success test case after 10s
+echo "=== Put policy: shall publish and consume success at least 10 secs later ==="
+req_id=$(get_random_number)
+RESULT=""
+# asynch callout
+do_curl PUT  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep 10
+# after 10s, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch error test case
+echo "=== Get policy status: shall publish and consume time-out ==="
+req_id=$(get_random_number)
+res=$(cat jsonfiles/timeout_response.json)
+RESULT="json:$res"
+# asynch callout
+do_curl GET  /policytypes/STD_2/kafkadispatcher/alpha/status 408 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep $timeout
+# after time out duration, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch success test case after 10s
+echo "=== Get policy status: shall publish and consume success at least 15 secs later ==="
+req_id=$(get_random_number)
+RESULT=""
+# asynch callout
+do_curl GET  /policytypes/ANR/kafkadispatcher/alpha/status 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+sleep 15
+# after 15s, publish the event
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+# asynch success test case without any delay
+echo "=== Delete policy: shall publish and consume success ==="
+req_id=$(get_random_number)
+RESULT=""
+# asynch callout
+do_curl DELETE  /policytypes/STD_1/kafkadispatcher/alpha 200 jsonfiles/alpha_policy.json $req_id &
+proc_id=$!
+publish_response_event $req_id
+# wait until the main process to be completed
+wait $proc_id
+
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/build_and_start.sh
new file mode 100755 (executable)
index 0000000..dff351e
--- /dev/null
@@ -0,0 +1,45 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Script to build and start the container
+# Make sure to run the simulator with the same arg as this script
+
+print_usage() {
+    echo "Usage: ./build_and_start.sh "
+    exit 1
+}
+
+if [ $# -ge 1 ]; then
+    print_usage
+fi
+
+echo "Building Kafka message dispatcher image..."
+cd ../KAFKA_DISPATCHER/
+
+#Build the image
+docker build -t kafka_dispatcher .
+
+docker stop kafkamessagedispatcher > /dev/null 2>&1
+docker rm -f kafkamessagedispatcher > /dev/null 2>&1
+
+echo "Starting Kafka message dispatcher..."
+echo "PWD path: "$PWD
+
+#Run the container in interactive mode with host networking driver which allows docker to access localhost, unsecure port 7075, secure port 7175, TIME_OUT must be in seconds
+docker run --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e TIME_OUT=30 --volume "$PWD/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/ANR_to_topic_map.json
new file mode 100644 (file)
index 0000000..ed52462
--- /dev/null
@@ -0,0 +1,4 @@
+{
+  "request_topic": "kafkatopicreq",
+  "response_topic": "kafkatopicres"
+}
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/alpha_policy.json
new file mode 100644 (file)
index 0000000..66c2b63
--- /dev/null
@@ -0,0 +1,11 @@
+{
+
+          "title": "A1 policy external server",
+          "description": "A1 policies notifying external server",
+          "type": "object",
+          "properties": {
+            "a1policyType": "alpha_test_policy",
+            "url" : "http://www.com"
+          }
+
+}
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/beta_policy.json
new file mode 100644 (file)
index 0000000..a61c7fc
--- /dev/null
@@ -0,0 +1,11 @@
+{
+
+          "title": "A1 policy external server",
+          "description": "A1 policies notifying external server",
+          "type": "object",
+          "properties": {
+            "a1policyType": "beta_test_policy",
+            "url" : "http://www.com"
+          }
+
+}
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/forced_response.json
new file mode 100644 (file)
index 0000000..4d26325
--- /dev/null
@@ -0,0 +1,5 @@
+{
+    "title": "Unknown",
+    "status": 500,
+    "detail": "Not implemented response code"
+}
diff --git a/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json b/near-rt-ric-simulator/test/KAFKA_DISPATCHER_TEST/jsonfiles/timeout_response.json
new file mode 100644 (file)
index 0000000..dd034c4
--- /dev/null
@@ -0,0 +1,5 @@
+{
+    "title": "Request timeout",
+    "status": 408,
+    "detail": "Request timeout"
+}
diff --git a/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh b/near-rt-ric-simulator/test/STD_2.0.0/build_and_start_with_kafka.sh
new file mode 100755 (executable)
index 0000000..c017513
--- /dev/null
@@ -0,0 +1,96 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Script to build and start the container
+# Make sure to run the simulator with the same arg as this script
+
+print_usage() {
+    echo "Usage: ./build_and_start.sh duplicate-check|ignore-duplicate kafka-srv|kafka-srv-secure|ignore-kafka-srv"
+    exit 1
+}
+
+if [ $# -ne 2 ]; then
+    print_usage
+fi
+
+if [ $1 == "duplicate-check" ]; then
+    DUP_CHECK=1
+elif  [ $1 == "ignore-duplicate" ]; then
+    DUP_CHECK=0
+else
+    print_usage
+fi
+
+if [ $2 == "kafka-srv" ]; then
+    URL="http://localhost:7075"
+elif  [ $2 == "kafka-srv-secure" ]; then
+    URL="https://localhost:7175"
+elif  [ $2 == "ignore-kafka-srv" ]; then
+    URL=""
+else
+    print_usage
+fi
+
+URL_FLAG=""
+if [ ! -z "$URL" ]; then
+    URL_FLAG="-e KAFKA_DISPATCHER_URL=$URL"
+fi
+
+# Stop and remove container images if they run
+
+echo "Stopping A1 simulator image..."
+docker stop a1StdSimulator > /dev/null 2>&1
+docker rm -f a1StdSimulator > /dev/null 2>&1
+
+echo "Stopping kafka dispatcher server image..."
+docker stop kafkamessagedispatcher > /dev/null 2>&1
+docker rm -f kafkamessagedispatcher > /dev/null 2>&1
+
+# Initialize path variables for certificate and build operations
+
+dirstd2=$PWD
+
+cd ../../
+dirnrtsim=$PWD
+
+cd test/KAFKA_DISPATCHER/
+dirkafkasrv=$PWD
+
+# Build containers
+
+cd $dirnrtsim
+echo "Building A1 simulator image..."
+docker build -t a1test .
+
+if [ ! -z "$URL" ]; then
+    cd $dirkafkasrv
+    echo "Building kafka server image..."
+    docker build -t kafka_dispatcher .
+fi
+
+# Run containers
+
+# Runs kafka server in detached mode
+# In order to tail logs use:: docker logs -f kafkamessagedispatcher
+if [ ! -z "$URL" ]; then
+    docker run -d --network host --rm -it -p 7075:7075 -p 7175:7175 -e ALLOW_HTTP=true -e MSG_BROKER_URL=localhost:9092 -e KAFKA_TOPIC_RES=kafkatopicres -e TIME_OUT=30000 --volume "$dirkafkasrv/certificate:/usr/src/app/cert" --name kafkamessagedispatcher kafka_dispatcher
+fi
+
+# Runs A1 simulator
+docker run --network host --rm -it -p 8085:8085 -p 8185:8185 -e A1_VERSION=STD_2.0.0 -e ALLOW_HTTP=true -e REMOTE_HOSTS_LOGGING=1 -e DUPLICATE_CHECK=$DUP_CHECK $URL_FLAG --volume "$dirnrtsim/certificate:/usr/src/app/cert" --name a1StdSimulator a1test
diff --git a/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py b/near-rt-ric-simulator/test/common/publish_response_event_to_kafka_bus.py
new file mode 100644 (file)
index 0000000..635dc6d
--- /dev/null
@@ -0,0 +1,87 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# This script publishes a response-event to a kafka bus
+# In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
+# To instal kafka-python please use: pip install kafka-python
+# Example of an response-event json
+#{
+  #"response-code": "400",
+  #"error-info": "Bad format"
+#}
+
+
+import os
+import json
+import sys
+
+from kafka import KafkaProducer
+
+# Response string with JSON format
+response_data_JSON =  """
+{
+  "response-code": 200,
+  "error-info": ""
+}
+"""
+
+# Instantiate KafkaProducer with keyword arguments
+# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
+def create_kafka_producer():
+
+  producer = KafkaProducer(
+    bootstrap_servers = ['localhost:9092'],
+    key_serializer = str.encode,
+    value_serializer = lambda m: json.dumps(m).encode('ascii'),
+  )
+  return producer
+
+# Helper: Publishes (to) the target broker and the topic in synch
+def publish(kafka_evet, req_id):
+
+  # Instantiate KafkaProducer with keyword arguments
+  producer = create_kafka_producer()
+  # Assigns an id to each request that is supposed to get a result
+  # req_id  = 'Hll1EsycKLNRric7'
+
+  try:
+
+    # synch-publish
+    # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
+    fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id)
+    return fut_rec_metadata.get()
+
+  except Exception as err:
+    print('Error while publish', err)
+  finally:
+    producer.close()
+
+if __name__ == '__main__':
+    try:
+
+        requestid = sys.argv[1]
+        # response_data_JSON is str
+        future = publish(response_data_JSON, requestid)
+
+        if (future is not None):
+            print (0)
+        else:
+            print (1)
+
+    except Exception:
+        print (1)
+    sys.exit()
index 6d2f80e..af4e79b 100755 (executable)
@@ -1,7 +1,7 @@
 #!/bin/bash
 
 #  ============LICENSE_START===============================================
-#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  Copyright (C) 2020-2022 Nordix Foundation. All rights reserved.
 #  ========================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
 #Expects the env $RESULT to contain the expected RESULT.
 #If json, the RESULT shall begin with 'json:'.
 #Any json parameter with unknown value shall be given as "????" to skip checking the value.
+#The requestid parameter is being introduced in the fifth order.
 do_curl() {
     if [ $# -lt 3 ]; then
         echo "Need 3 or more parameters, <http-operation> <url> <response-code> [file]: "$@
@@ -33,9 +34,12 @@ do_curl() {
         exit 1
     fi
     curlstr="curl -X "$1" -skw %{http_code} $HTTPX://localhost:"${PORT}${2}" -H accept:*/*"
-    if [ $# -gt 3 ]; then
+    if [ $# -eq 4 ]; then
         curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4
     fi
+    if [ $# -ge 5 ]; then
+        curlstr=$curlstr" -H Content-Type:application/json --data-binary @"$4" -H requestid:"$5
+    fi
     echo "  CMD (${BASH_LINENO[0]}):"$curlstr
     res=$($curlstr)
     status=${res:${#res}-3}
@@ -75,6 +79,33 @@ do_curl() {
     fi
 }
 
+# Triggers publish_event_to_kafka_bus.py script to send msg to Kafka broker
+# The aim of this function is to realize error related test cases only
+# The request_id for the Kafka msg, should be passed here as a function parameter
+publish_response_event() {
+    if [ $# -lt 1 ]; then
+        echo "Need 1 parameter, <request_id>"
+        echo "Exiting test script....."
+        exit 1
+    fi
+    res=$(python ../common/publish_response_event_to_kafka_bus.py "$1")
+    if [ $res -eq 0 ]; then
+        echo "  Result as expected  "
+    else
+        echo "  Result not expected  "
+        echo "  Exiting.....  "
+        exit 1
+    fi
+}
+
+# Creates 16 digits random number using letters and numbers only
+get_random_number() {
+    r_num=$(tr -dc A-Za-z0-9 < /dev/urandom | head -c 16)
+    echo $r_num
+}
+
+# It is being used to cross-test-cases in between A1 sim and external server
+# The parameter it holds all with regards to External Server relates e.g. HTTPX_EXT_SRV and PORT_EXT_SRV
 do_curl_ext_srv() {
     if [ $# -lt 3 ]; then
         echo "Need 3 or more parameters, <http-operation> <url> <response-code> [file]: "$@