Update version number in container-tag for F Maintenance Release
[sim/a1-interface.git] / near-rt-ric-simulator / test / KAFKA_DISPATCHER / src / maincommon.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import os
19 import sys
20 import json
21 from pathlib import Path
22 from flask import Response
23 import socket
24 import ssl
25 import random
26 import string
27
28 from kafka import KafkaProducer, KafkaConsumer
29
30 #Must exist
31 apipath=os.environ['APIPATH']
32 timeout=os.getenv('TIME_OUT')
33
34 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
35
36
37 # Make sure the  api path is set, otherwise exit
38 def check_apipath():
39     if (apipath is None):
40       print("Env APIPATH not set. Exiting....")
41       sys.exit(1)
42
43 # Make sure the  timeout is set and greater than zero, otherwise exit
44 def check_timeout():
45     if (timeout is None):
46       print("Env TIME_OUT not set. Exiting....")
47       sys.exit(1)
48     elif (int(timeout) < 0):
49       print("Env TIME_OUT must be greater than zero. Exiting....")
50       sys.exit(1)
51
52 # Instantiate KafkaProducer with keyword arguments
53 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
54 def create_kafka_producer():
55
56   producer = KafkaProducer(
57     bootstrap_servers = [MSG_BROKER_URL],
58     key_serializer = str.encode,
59     value_serializer = lambda m: json.dumps(m).encode('ascii'),
60   )
61   return producer
62
63
64 # Instantiate KafkaConsumer with keyword arguments
65 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
66 def create_kafka_consumer():
67   consumer = KafkaConsumer(
68     # kafka cluster endpoint
69     bootstrap_servers = MSG_BROKER_URL,
70     # move to the earliest or latest available message
71     auto_offset_reset = 'earliest',
72     # number of milliseconds to block during message iteration
73     # if no new message available during this period of time, iteration through a for-loop will stop automatically
74     consumer_timeout_ms = 100,
75     value_deserializer = lambda m: json.loads(m.decode('ascii')),
76     #enable_auto_commit=False
77   )
78   return consumer
79
80
81 # Helper: Builds a Kafka event
82 def create_kafka_event(policy_type_id, policy_id, payload, operation):
83
84   kafka_event_format = {'action': operation_to_action(operation), 'payload': payload, 'policy_type_id': policy_type_id, 'policy_id': policy_id}
85   # converts dict to str
86   kafka_event_json = json.dumps(kafka_event_format)
87   return kafka_event_json
88
89 # Helper: Builds a Kafka event
90 def create_kafka_response_event(response_code, error_info):
91
92   kafka_response_event_format = {'response-code': response_code, 'error-info': error_info}
93   # converts dict to str
94   kafka_response_event_json = json.dumps(kafka_response_event_format)
95   return kafka_response_event_json
96
97 # Helper: Converts a HTTP operation to an explanation
98 def operation_to_action(argument):
99
100   switcher = {
101     'CREATE': "CreatePolicy",
102     'UPDATE': "UpdatePolicy",
103     'DELETE': "DeletePolicy",
104     'GET': "GetPolicyStatus",
105   }
106   return switcher.get(argument, None)
107
108
109 # Helper: Converts a byte array to a str
110 def byte_to_str(byte_arr):
111
112   if (byte_arr is not None):
113     return byte_arr.decode('utf-8')
114   else:
115     return None
116
117
118 # Helper: Creates random string
119 def get_random_string(length):
120
121   characters = string.ascii_letters + string.digits + string.punctuation
122   password = ''.join(random.choice(characters) for i in range(length))
123   return password