c534acbe201c0ab3dda17e2e0e090c642507976a
[sim/a1-interface.git] / near-rt-ric-simulator / test / KAFKA_DISPATCHER / src / maincommon.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import os
19 import sys
20 import json
21 from pathlib import Path
22 from flask import Response
23 import socket
24 import ssl
25 import random
26 import string
27
28 from kafka import KafkaProducer, KafkaConsumer
29
30 #Must exist
31 apipath=os.environ['APIPATH']
32 timeout=os.getenv('TIME_OUT')
33
34 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
35
36
37 # Make sure the  api path is set, otherwise exit
38 def check_apipath():
39     if (apipath is None):
40       print("Env APIPATH not set. Exiting....")
41       sys.exit(1)
42
43 # Make sure the  timeout is set and greater than zero, otherwise exit
44 def check_timeout():
45     if (timeout is None):
46       print("Env TIME_OUT not set. Exiting....")
47       sys.exit(1)
48     elif (int(timeout) < 0):
49       print("Env TIME_OUT must be greater than zero. Exiting....")
50       sys.exit(1)
51
52 # Instantiate KafkaProducer with keyword arguments
53 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
54 def create_kafka_producer():
55
56   producer = KafkaProducer(
57     bootstrap_servers = [MSG_BROKER_URL],
58     key_serializer = str.encode,
59     value_serializer = lambda m: json.dumps(m).encode('ascii'),
60   )
61   return producer
62
63
64 # Instantiate KafkaConsumer with keyword arguments
65 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
66 def create_kafka_consumer():
67   consumer = KafkaConsumer(
68     #KAFKA_TOPIC_RES,
69     bootstrap_servers = MSG_BROKER_URL,
70     auto_offset_reset = 'earliest',
71     value_deserializer = lambda m: json.loads(m.decode('ascii')),
72     #enable_auto_commit=False
73   )
74   return consumer
75
76
77 # Helper: Builds a Kafka event
78 def create_kafka_event(policy_type_id, policy_id, payload, operation):
79
80   kafka_event_format = {'action': operation_to_action(operation), 'payload': payload, 'policy_type_id': policy_type_id, 'policy_id': policy_id}
81   # converts dict to str
82   kafka_event_json = json.dumps(kafka_event_format)
83   return kafka_event_json
84
85 # Helper: Builds a Kafka event
86 def create_kafka_response_event(response_code, error_info):
87
88   kafka_response_event_format = {'response-code': response_code, 'error-info': error_info}
89   # converts dict to str
90   kafka_response_event_json = json.dumps(kafka_response_event_format)
91   return kafka_response_event_json
92
93 # Helper: Converts a HTTP operation to an explanation
94 def operation_to_action(argument):
95
96   switcher = {
97     'CREATE': "CreatePolicy",
98     'UPDATE': "UpdatePolicy",
99     'DELETE': "DeletePolicy",
100     'GET': "GetPolicyStatus",
101   }
102   return switcher.get(argument, None)
103
104
105 # Helper: Converts a byte array to a str
106 def byte_to_str(byte_arr):
107
108   if (byte_arr is not None):
109     return byte_arr.decode('utf-8')
110   else:
111     return None
112
113
114 # Helper: Creates random string
115 def get_random_string(length):
116
117   characters = string.ascii_letters + string.digits + string.punctuation
118   password = ''.join(random.choice(characters) for i in range(length))
119   return password