1 # ============LICENSE_START===============================================
2 # Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
23 from flask import request, Response
24 from datetime import datetime
25 from kafka.consumer.fetcher import ConsumerRecord
26 from kafka import TopicPartition
27 from var_declaration import forced_settings
28 from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string
31 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
33 TIME_OUT=os.getenv('TIME_OUT')
36 APPL_JSON='application/json'
37 TEXT_PLAIN='text/plain'
38 APPL_PROB_JSON='application/problem+json'
40 # API Function: Dispatch create or update events to Kafka cluster
41 def put_policy(policyTypeId, policyId):
43 if ((r := check_modified_response()) is not None):
46 policy_type_id = str(policyTypeId)
47 policy_id = str(policyId)
50 # Error based unit test rel only, for more info please check basic_test_with_cust_header
51 req_id_from_header = request.headers.get('requestid')
52 # Differentiate if the PUT is update or create operation since the endpoint is the same
53 update_oper_from_header = request.headers.get('updateoper')
55 data = json.loads(data)
57 pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id)
58 return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
60 # Decide if the operation is update or create
61 if (update_oper_from_header is not None):
62 kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE')
64 kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE')
66 # Synch callout hooks towards kafka broker
67 if (MSG_BROKER_URL is not None):
68 return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
70 return Response('', 200, mimetype=TEXT_PLAIN)
73 # API Function: Dispatch delete events to south Kafka cluster
74 def delete_policy(policyTypeId, policyId):
76 if ((r := check_modified_response()) is not None):
79 policy_type_id = str(policyTypeId)
80 policy_id = str(policyId)
82 req_id_from_header = request.headers.get('requestid')
83 print('req_id_from_header', req_id_from_header)
85 # Synch callout hooks towards kafka broker
86 kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE')
87 if (MSG_BROKER_URL is not None):
88 return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
90 return Response('', 200, mimetype=TEXT_PLAIN)
93 # API Function: Get status for a policy
94 def get_policy_status(policyTypeId, policyId):
96 if ((r := check_modified_response()) is not None):
99 policy_type_id=str(policyTypeId)
100 policy_id=str(policyId)
102 req_id_from_header = request.headers.get('requestid')
103 print('req_id_from_header', req_id_from_header)
105 # Synch callout hooks towards kafka broker
106 kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET')
107 if (MSG_BROKER_URL is not None):
108 return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
110 return Response('', 200, mimetype=TEXT_PLAIN)
113 def get_policy_type_to_topic_mapping(policyTypeId):
115 if ((r := check_modified_response()) is not None):
118 policy_type_id = str(policyTypeId)
120 m_file = open('../resources/policytype_to_topicmap.json')
121 map_in_dict = json.load(m_file)
123 if policy_type_id in map_in_dict.keys():
124 topic_address = map_in_dict[policy_type_id]
125 return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON)
127 pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id)
128 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
131 # Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch
132 def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
134 # Instantiate KafkaProducer with keyword arguments
135 producer = create_kafka_producer()
137 # Assigns an id to each request that is supposed to get a result
138 # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only
139 if (req_id_from_header is None):
140 req_id = get_random_string(16)
142 req_id = req_id_from_header
146 resp = get_policy_type_to_topic_mapping(pol_type_id)
147 # if the policy type to topic mapping could not be found, then returns 404
148 # else gets target topic to publish the message to
149 if (resp.status_code == 404):
152 data = json.loads(resp.data)
153 target_topic_req = data['request_topic']
154 target_topic_res = data['response_topic']
157 # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
158 fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id)
159 record_metadata = fut_rec_metadata.get()
160 print('Future:', record_metadata)
161 publish_time_in_ms = record_metadata.timestamp
163 # For test purposes only, publish the success response event with no error-info to response topic
164 # if basic_test_with_cust_header.sh is being used, then comment this line
165 # else comment out this line for the basic_test.sh
166 kafka_response_event = create_kafka_response_event(200, "")
167 producer.send(target_topic_res, kafka_response_event, req_id)
170 consumer_record = consume_record_for(req_id, target_topic_res)
171 if (isinstance(consumer_record, ConsumerRecord)):
173 print("Consumer Record:", consumer_record)
174 cons_rec_value = consumer_record.value
175 cons_rec_val_in_dict = json.loads(cons_rec_value)
176 resp_code = cons_rec_val_in_dict['response-code']
178 # if response code success, then check for time-out
179 if (int(resp_code) == 200):
180 # time-out control block, default time-out duration is thirty seconds
181 consume_time_in_ms = consumer_record.timestamp
182 elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms
183 print('Elapsed time in ms:', elapsed_time_in_ms)
184 if (elapsed_time_in_ms < int(TIME_OUT) * 1000):
185 return Response('', 200, mimetype=APPL_JSON)
187 # returns time-out response code
188 pjson=create_error_response(408)
189 return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
191 # for all other responses returns special error of this module by wrapping actual resp code
192 pjson=create_error_response(419)
193 return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
195 elif (isinstance(consumer_record, Response)):
196 # Returns time-out response
197 return consumer_record
199 # returns special error of this module
200 pjson=create_error_response(419)
201 return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
203 except Exception as err:
204 print('Error while publish and consume', err)
205 pjson=create_error_response(419)
206 return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
211 # Helper: Searches for req_id by seeking every five seconds up to thirty seconds
212 # Helper: If the req_id is found, then ConsumerRecord will be returned
213 # Helper: If the req_id is not found, then Response Request Timeout will be returned
214 def consume_record_for(req_id, target_topic_res):
217 print ('req_id looking for in consumer:', req_id)
218 consumer = create_kafka_consumer()
219 topic_partition = TopicPartition(target_topic_res, 0)
220 consumer.assign([topic_partition])
222 # calculates poll cycle threshold
223 sleep_period_in_sec = 5
224 poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
225 poll_cycle_threshold = math.floor(poll_cycle_threshold)
226 print('poll_cycle_threshold', poll_cycle_threshold)
231 while (poll_retries < poll_cycle_threshold):
232 # Manually specify the fetch offset for a TopicPartition
233 consumer.seek(topic_partition, starting_offset)
234 # Get the last offset for the given partitions
235 last_offset = consumer.end_offsets([topic_partition])[topic_partition]
236 print('last_offset',last_offset)
238 if (last_offset != prev_last_offset):
239 for consumer_record in consumer:
240 # Get req_id as msg_key and converts it from byte to str for each consumer record
241 msg_key = byte_to_str(consumer_record.key)
242 print ('msg_key in a consumer_record:', msg_key)
243 if (req_id == msg_key):
244 print ('req_id is found in consumer records', req_id)
245 return consumer_record
246 elif (consumer_record.offset == last_offset - 1):
249 print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
250 time.sleep(sleep_period_in_sec)
253 prev_last_offset = last_offset
254 starting_offset += last_offset
256 # Returns time-out response
257 pjson=create_error_response(408)
258 return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
260 except Exception as err:
261 print('Error while consume record for req_id', err)
262 pjson=create_error_response(419)
263 return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
268 # Helper: Create a response object if forced http response code is set
269 def get_forced_response():
271 if (forced_settings['code'] is not None):
272 resp_code=forced_settings['code']
273 pjson=create_error_response(int(resp_code))
274 return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
278 # Helper: Delay if delayed response code is set
281 if (forced_settings['delay'] is not None):
283 val=int(forced_settings['delay'])
289 # Helper: Check if response shall be delayed or a forced response shall be sent
290 def check_modified_response():
293 return get_forced_response()
296 # Helper: Create a problem json object
297 def create_problem_json(type_of, title, status, detail, instance):
300 if type_of is not None:
301 error["type"] = type_of
302 if title is not None:
303 error["title"] = title
304 if status is not None:
305 error["status"] = status
306 if detail is not None:
307 error["detail"] = detail
308 if instance is not None:
309 error["instance"] = instance
313 # Helper: Create a problem json based on a generic http response code
314 def create_error_response(code):
317 return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
319 return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
321 return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
323 return(create_problem_json(None, "Request timeout", 408, "Request timeout", None))
325 return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
327 return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None))
329 return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
331 return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
333 return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
335 return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))