19e2ab65763d7541d94c22b9fcee9c90619d2e2a
[sim/a1-interface.git] / near-rt-ric-simulator / test / KAFKA_DISPATCHER / src / dispatcher.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import os
19 import json
20 import time
21 import math
22
23 from flask import request, Response
24 from datetime import datetime
25 from kafka.consumer.fetcher import ConsumerRecord
26 from kafka import TopicPartition
27 from var_declaration import forced_settings
28 from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string
29
30
31 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
32
33 TIME_OUT=os.getenv('TIME_OUT')
34
35 #Constsants
36 APPL_JSON='application/json'
37 TEXT_PLAIN='text/plain'
38 APPL_PROB_JSON='application/problem+json'
39
40 # API Function: Dispatch create or update events to Kafka cluster
41 def put_policy(policyTypeId, policyId):
42
43   if ((r := check_modified_response()) is not None):
44     return r
45
46   policy_type_id = str(policyTypeId)
47   policy_id = str(policyId)
48
49   try:
50     # Error based unit test rel only, for more info please check basic_test_with_cust_header
51     req_id_from_header = request.headers.get('requestid')
52     # Differentiate if the PUT is update or create operation since the endpoint is the same
53     update_oper_from_header = request.headers.get('updateoper')
54     data = request.data
55     data = json.loads(data)
56   except Exception:
57     pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id)
58     return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
59
60   # Decide if the operation is update or create
61   if (update_oper_from_header is not None):
62     kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE')
63   else:
64     kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE')
65
66   # Synch callout hooks towards kafka broker
67   if (MSG_BROKER_URL is not None):
68     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
69
70   return Response('', 200, mimetype=TEXT_PLAIN)
71
72
73 # API Function: Dispatch delete events to south Kafka cluster
74 def delete_policy(policyTypeId, policyId):
75
76   if ((r := check_modified_response()) is not None):
77     return r
78
79   policy_type_id = str(policyTypeId)
80   policy_id = str(policyId)
81
82   req_id_from_header = request.headers.get('requestid')
83   print('req_id_from_header', req_id_from_header)
84
85   # Synch callout hooks towards kafka broker
86   kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE')
87   if (MSG_BROKER_URL is not None):
88     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
89
90   return Response('', 200, mimetype=TEXT_PLAIN)
91
92
93 # API Function: Get status for a policy
94 def get_policy_status(policyTypeId, policyId):
95
96   if ((r := check_modified_response()) is not None):
97     return r
98
99   policy_type_id=str(policyTypeId)
100   policy_id=str(policyId)
101
102   req_id_from_header = request.headers.get('requestid')
103   print('req_id_from_header', req_id_from_header)
104
105   # Synch callout hooks towards kafka broker
106   kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET')
107   if (MSG_BROKER_URL is not None):
108     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
109
110   return Response('', 200, mimetype=TEXT_PLAIN)
111
112
113 def get_policy_type_to_topic_mapping(policyTypeId):
114
115   if ((r := check_modified_response()) is not None):
116     return r
117
118   policy_type_id = str(policyTypeId)
119
120   m_file = open('../resources/policytype_to_topicmap.json')
121   map_in_dict = json.load(m_file)
122
123   if policy_type_id in map_in_dict.keys():
124     topic_address = map_in_dict[policy_type_id]
125     return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON)
126   else:
127     pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id)
128     return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
129
130
131 # Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch
132 def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
133
134   # Instantiate KafkaProducer with keyword arguments
135   producer = create_kafka_producer()
136
137   # Assigns an id to each request that is supposed to get a result
138   # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only
139   if (req_id_from_header is None):
140     req_id = get_random_string(16)
141   else:
142     req_id = req_id_from_header
143
144   try:
145
146     resp = get_policy_type_to_topic_mapping(pol_type_id)
147     # if the policy type to topic mapping could not be found, then returns 404
148     # else gets target topic to publish the message to
149     if (resp.status_code == 404):
150       return resp
151     else:
152       data = json.loads(resp.data)
153       target_topic_req = data['request_topic']
154       target_topic_res = data['response_topic']
155
156     # synch-publish
157     # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
158     fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id)
159     record_metadata = fut_rec_metadata.get()
160     print('Future:', record_metadata)
161     publish_time_in_ms = record_metadata.timestamp
162
163     # For test purposes only, publish the success response event with no error-info to response topic
164     # if basic_test_with_cust_header.sh is being used, then comment this line
165     # else comment out this line for the basic_test.sh
166     kafka_response_event = create_kafka_response_event(200, "")
167     producer.send(target_topic_res, kafka_response_event, req_id)
168
169     # synch-consume
170     consumer_record = consume_record_for(req_id, target_topic_res)
171     if (isinstance(consumer_record, ConsumerRecord)):
172
173       print("Consumer Record:", consumer_record)
174       cons_rec_value = consumer_record.value
175       cons_rec_val_in_dict = json.loads(cons_rec_value)
176       resp_code = cons_rec_val_in_dict['response-code']
177
178       # if response code success, then check for time-out
179       if (int(resp_code) == 200):
180         # time-out control block, default time-out duration is thirty seconds
181         consume_time_in_ms = consumer_record.timestamp
182         elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms
183         print('Elapsed time in ms:', elapsed_time_in_ms)
184         if (elapsed_time_in_ms < int(TIME_OUT) * 1000):
185           return Response('', 200, mimetype=APPL_JSON)
186         else:
187           # returns time-out response code
188           pjson=create_error_response(408)
189           return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
190       else:
191         # for all other responses returns special error of this module by wrapping actual resp code
192         pjson=create_error_response(419)
193         return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
194
195     elif (isinstance(consumer_record, Response)):
196       # Returns time-out response
197       return consumer_record
198     else:
199       # returns special error of this module
200       pjson=create_error_response(419)
201       return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
202
203   except Exception as err:
204     print('Error while publish and consume', err)
205     pjson=create_error_response(419)
206     return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
207   finally:
208     producer.close()
209
210
211 # Helper: Searches for req_id by seeking every five seconds up to thirty seconds
212 # Helper: If the req_id is found, then ConsumerRecord will be returned
213 # Helper: If the req_id is not found, then Response Request Timeout will be returned
214 def consume_record_for(req_id, target_topic_res):
215
216   try:
217     print ('req_id looking for in consumer:', req_id)
218     consumer = create_kafka_consumer()
219     topic_partition = TopicPartition(target_topic_res, 0)
220     consumer.assign([topic_partition])
221
222     # calculates poll cycle threshold
223     sleep_period_in_sec = 5
224     poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
225     poll_cycle_threshold = math.floor(poll_cycle_threshold)
226     print('poll_cycle_threshold', poll_cycle_threshold)
227
228     poll_retries = 0
229     starting_offset = 0
230     prev_last_offset = 0
231     while (poll_retries < poll_cycle_threshold):
232       # Manually specify the fetch offset for a TopicPartition
233       consumer.seek(topic_partition, starting_offset)
234       # Get the last offset for the given partitions
235       last_offset = consumer.end_offsets([topic_partition])[topic_partition]
236       print('last_offset',last_offset)
237
238       if (last_offset != prev_last_offset):
239         for consumer_record in consumer:
240           # Get req_id as msg_key and converts it from byte to str for each consumer record
241           msg_key = byte_to_str(consumer_record.key)
242           print ('msg_key in a consumer_record:', msg_key)
243           if (req_id == msg_key):
244             print ('req_id is found in consumer records', req_id)
245             return consumer_record
246           elif (consumer_record.offset == last_offset - 1):
247             break
248
249       print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
250       time.sleep(sleep_period_in_sec)
251
252       poll_retries += 1
253       prev_last_offset = last_offset
254       starting_offset += last_offset
255
256     # Returns time-out response
257     pjson=create_error_response(408)
258     return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
259
260   except Exception as err:
261     print('Error while consume record for req_id', err)
262     pjson=create_error_response(419)
263     return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
264   finally:
265     consumer.close()
266
267
268 # Helper: Create a response object if forced http response code is set
269 def get_forced_response():
270
271   if (forced_settings['code'] is not None):
272     resp_code=forced_settings['code']
273     pjson=create_error_response(int(resp_code))
274     return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
275   return None
276
277
278 # Helper: Delay if delayed response code is set
279 def do_delay():
280
281   if (forced_settings['delay'] is not None):
282     try:
283       val=int(forced_settings['delay'])
284       time.sleep(val)
285     except Exception:
286       return
287
288
289 # Helper: Check if response shall be delayed or a forced response shall be sent
290 def check_modified_response():
291
292   do_delay()
293   return get_forced_response()
294
295
296 # Helper: Create a problem json object
297 def create_problem_json(type_of, title, status, detail, instance):
298
299   error = {}
300   if type_of is not None:
301     error["type"] = type_of
302   if title is not None:
303     error["title"] = title
304   if status is not None:
305     error["status"] = status
306   if detail is not None:
307     error["detail"] = detail
308   if instance is not None:
309     error["instance"] = instance
310   return error
311
312
313 # Helper: Create a problem json based on a generic http response code
314 def create_error_response(code):
315
316     if code == 400:
317       return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
318     elif code == 404:
319       return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
320     elif code == 405:
321       return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
322     elif code == 408:
323       return(create_problem_json(None, "Request timeout", 408, "Request timeout", None))
324     elif code == 409:
325       return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
326     elif (code == 419):
327       return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None))
328     elif code == 429:
329       return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
330     elif code == 507:
331       return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
332     elif code == 503:
333       return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
334     else:
335       return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))