1 # ============LICENSE_START===============================================
2 # Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
21 from pathlib import Path
22 from flask import Response
28 from kafka import KafkaProducer, KafkaConsumer
31 apipath=os.environ['APIPATH']
32 timeout=os.getenv('TIME_OUT')
34 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
37 # Make sure the api path is set, otherwise exit
40 print("Env APIPATH not set. Exiting....")
43 # Make sure the timeout is set and greater than zero, otherwise exit
46 print("Env TIME_OUT not set. Exiting....")
48 elif (int(timeout) < 0):
49 print("Env TIME_OUT must be greater than zero. Exiting....")
52 # Instantiate KafkaProducer with keyword arguments
53 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
54 def create_kafka_producer():
56 producer = KafkaProducer(
57 bootstrap_servers = [MSG_BROKER_URL],
58 key_serializer = str.encode,
59 value_serializer = lambda m: json.dumps(m).encode('ascii'),
64 # Instantiate KafkaConsumer with keyword arguments
65 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
66 def create_kafka_consumer():
67 consumer = KafkaConsumer(
68 # kafka cluster endpoint
69 bootstrap_servers = MSG_BROKER_URL,
70 # move to the earliest or latest available message
71 auto_offset_reset = 'earliest',
72 # number of milliseconds to block during message iteration
73 # if no new message available during this period of time, iteration through a for-loop will stop automatically
74 consumer_timeout_ms = 100,
75 value_deserializer = lambda m: json.loads(m.decode('ascii')),
76 #enable_auto_commit=False
81 # Helper: Builds a Kafka event
82 def create_kafka_event(policy_type_id, policy_id, payload, operation):
84 kafka_event_format = {'action': operation_to_action(operation), 'payload': payload, 'policy_type_id': policy_type_id, 'policy_id': policy_id}
85 # converts dict to str
86 kafka_event_json = json.dumps(kafka_event_format)
87 return kafka_event_json
89 # Helper: Builds a Kafka event
90 def create_kafka_response_event(response_code, error_info):
92 kafka_response_event_format = {'response-code': response_code, 'error-info': error_info}
93 # converts dict to str
94 kafka_response_event_json = json.dumps(kafka_response_event_format)
95 return kafka_response_event_json
97 # Helper: Converts a HTTP operation to an explanation
98 def operation_to_action(argument):
101 'CREATE': "CreatePolicy",
102 'UPDATE': "UpdatePolicy",
103 'DELETE': "DeletePolicy",
104 'GET': "GetPolicyStatus",
106 return switcher.get(argument, None)
109 # Helper: Converts a byte array to a str
110 def byte_to_str(byte_arr):
112 if (byte_arr is not None):
113 return byte_arr.decode('utf-8')
118 # Helper: Creates random string
119 def get_random_string(length):
121 characters = string.ascii_letters + string.digits + string.punctuation
122 password = ''.join(random.choice(characters) for i in range(length))