Update version number in container-tag for F Maintenance Release
[sim/a1-interface.git] / near-rt-ric-simulator / test / KAFKA_DISPATCHER / src / dispatcher.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import os
19 import json
20 import time
21 import math
22
23 from flask import request, Response
24 from datetime import datetime
25 from kafka.consumer.fetcher import ConsumerRecord
26 from kafka import TopicPartition
27 from var_declaration import forced_settings
28 from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string
29
30
31 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
32
33 TIME_OUT=os.getenv('TIME_OUT')
34 publishresponse=os.getenv('PUBLISH_RESP')
35
36 #Constsants
37 APPL_JSON='application/json'
38 TEXT_PLAIN='text/plain'
39 APPL_PROB_JSON='application/problem+json'
40
41 # API Function: Dispatch create or update events to Kafka cluster
42 def put_policy(policyTypeId, policyId):
43
44   if ((r := check_modified_response()) is not None):
45     return r
46
47   policy_type_id = str(policyTypeId)
48   policy_id = str(policyId)
49
50   try:
51     # Error based unit test rel only, for more info please check basic_test_with_cust_header
52     req_id_from_header = request.headers.get('requestid')
53     # Differentiate if the PUT is update or create operation since the endpoint is the same
54     update_oper_from_header = request.headers.get('updateoper')
55     data = request.data
56     data = json.loads(data)
57   except Exception:
58     pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id)
59     return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
60
61   # Decide if the operation is update or create
62   if (update_oper_from_header is not None):
63     kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE')
64   else:
65     kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE')
66
67   # Synch callout hooks towards kafka broker
68   if (MSG_BROKER_URL is not None):
69     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
70
71   return Response('', 200, mimetype=TEXT_PLAIN)
72
73
74 # API Function: Dispatch delete events to south Kafka cluster
75 def delete_policy(policyTypeId, policyId):
76
77   if ((r := check_modified_response()) is not None):
78     return r
79
80   policy_type_id = str(policyTypeId)
81   policy_id = str(policyId)
82
83   req_id_from_header = request.headers.get('requestid')
84   print('req_id_from_header', req_id_from_header)
85
86   # Synch callout hooks towards kafka broker
87   kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE')
88   if (MSG_BROKER_URL is not None):
89     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
90
91   return Response('', 200, mimetype=TEXT_PLAIN)
92
93
94 # API Function: Get status for a policy
95 def get_policy_status(policyTypeId, policyId):
96
97   if ((r := check_modified_response()) is not None):
98     return r
99
100   policy_type_id=str(policyTypeId)
101   policy_id=str(policyId)
102
103   req_id_from_header = request.headers.get('requestid')
104   print('req_id_from_header', req_id_from_header)
105
106   # Synch callout hooks towards kafka broker
107   kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET')
108   if (MSG_BROKER_URL is not None):
109     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
110
111   return Response('', 200, mimetype=TEXT_PLAIN)
112
113
114 def get_policy_type_to_topic_mapping(policyTypeId):
115
116   if ((r := check_modified_response()) is not None):
117     return r
118
119   policy_type_id = str(policyTypeId)
120
121   m_file = open('../resources/policytype_to_topicmap.json')
122   map_in_dict = json.load(m_file)
123
124   if policy_type_id in map_in_dict.keys():
125     topic_address = map_in_dict[policy_type_id]
126     return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON)
127   else:
128     pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id)
129     return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
130
131
132 # Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch
133 def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
134
135   # Instantiate KafkaProducer with keyword arguments
136   producer = create_kafka_producer()
137
138   # Assigns an id to each request that is supposed to get a result
139   # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only
140   if (req_id_from_header is None):
141     req_id = get_random_string(16)
142   else:
143     req_id = req_id_from_header
144
145   try:
146
147     resp = get_policy_type_to_topic_mapping(pol_type_id)
148     # if the policy type to topic mapping could not be found, then returns 404
149     # else gets target topic to publish the message to
150     if (resp.status_code == 404):
151       return resp
152     else:
153       data = json.loads(resp.data)
154       target_topic_req = data['request_topic']
155       target_topic_res = data['response_topic']
156
157     # synch-publish
158     # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
159     fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id)
160     record_metadata = fut_rec_metadata.get()
161     print('Future:', record_metadata)
162     publish_time_in_ms = record_metadata.timestamp
163
164     # For test purposes only triggered from A1 sim
165     # Publish the success response event with no error-info to response topic
166     # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test
167     if (publishresponse is not None and req_id_from_header is None):
168       kafka_response_event = create_kafka_response_event(200, "")
169       producer.send(target_topic_res, kafka_response_event, req_id)
170
171     # synch-consume
172     consumer_record = consume_record_for(req_id, target_topic_res)
173     if (isinstance(consumer_record, ConsumerRecord)):
174
175       print("Consumer Record:", consumer_record)
176       cons_rec_value = consumer_record.value
177       cons_rec_val_in_dict = json.loads(cons_rec_value)
178       resp_code = cons_rec_val_in_dict['response-code']
179
180       # if response code success, then check for time-out
181       if (int(resp_code) == 200):
182         # time-out control block, default time-out duration is thirty seconds
183         consume_time_in_ms = consumer_record.timestamp
184         elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms
185         print('Elapsed time in ms:', elapsed_time_in_ms)
186         if (elapsed_time_in_ms < int(TIME_OUT) * 1000):
187           return Response('', 200, mimetype=APPL_JSON)
188         else:
189           # returns time-out response code
190           pjson=create_error_response(408)
191           return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
192       else:
193         # for all other responses returns special error of this module by wrapping actual resp code
194         pjson=create_error_response(419)
195         return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
196
197     elif (isinstance(consumer_record, Response)):
198       # Returns time-out response
199       return consumer_record
200     else:
201       # returns special error of this module
202       pjson=create_error_response(419)
203       return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
204
205   except Exception as err:
206     print('Error while publish and consume', err)
207     pjson=create_error_response(419)
208     return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
209   finally:
210     producer.close()
211
212
213 # Helper: Searches for req_id by seeking every five seconds up to thirty seconds
214 # Helper: If the req_id is found, then ConsumerRecord will be returned
215 # Helper: If the req_id is not found, then Response Request Timeout will be returned
216 def consume_record_for(req_id, target_topic_res):
217
218   try:
219     print ('req_id looking for in consumer: ' + target_topic_res, req_id)
220     consumer = create_kafka_consumer()
221     topic_partition = TopicPartition(target_topic_res, 0)
222     consumer.assign([topic_partition])
223
224     sleep_period_in_sec = 5
225     poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
226     poll_retries = 0
227
228     while (poll_retries < poll_cycle_threshold):
229       for consumer_record in consumer:
230         # Get req_id as msg_key and converts it from byte to str for each consumer record
231         msg_key = byte_to_str(consumer_record.key)
232         print ('msg_key in a consumer_record:', msg_key)
233         if (req_id == msg_key):
234           print ('req_id is found in consumer records', req_id)
235           return consumer_record
236
237       print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
238       time.sleep(sleep_period_in_sec)
239       poll_retries += 1
240
241     # Returns time-out response
242     pjson=create_error_response(408)
243     return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
244
245   except Exception as err:
246     print('Error while consume record for req_id', err)
247     pjson=create_error_response(419)
248     return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
249   finally:
250     consumer.close()
251
252 # Helper: calculates poll cycle threshold
253 def calc_pollcycle_threshold(sleep_period_in_sec):
254
255     poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
256     poll_cycle_threshold = math.floor(poll_cycle_threshold)
257     return poll_cycle_threshold
258
259 # Helper: Create a response object if forced http response code is set
260 def get_forced_response():
261
262   if (forced_settings['code'] is not None):
263     resp_code=forced_settings['code']
264     pjson=create_error_response(int(resp_code))
265     return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
266   return None
267
268
269 # Helper: Delay if delayed response code is set
270 def do_delay():
271
272   if (forced_settings['delay'] is not None):
273     try:
274       val=int(forced_settings['delay'])
275       time.sleep(val)
276     except Exception:
277       return
278
279
280 # Helper: Check if response shall be delayed or a forced response shall be sent
281 def check_modified_response():
282
283   do_delay()
284   return get_forced_response()
285
286
287 # Helper: Create a problem json object
288 def create_problem_json(type_of, title, status, detail, instance):
289
290   error = {}
291   if type_of is not None:
292     error["type"] = type_of
293   if title is not None:
294     error["title"] = title
295   if status is not None:
296     error["status"] = status
297   if detail is not None:
298     error["detail"] = detail
299   if instance is not None:
300     error["instance"] = instance
301   return error
302
303
304 # Helper: Create a problem json based on a generic http response code
305 def create_error_response(code):
306
307     if code == 400:
308       return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
309     elif code == 404:
310       return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
311     elif code == 405:
312       return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
313     elif code == 408:
314       return(create_problem_json(None, "Request timeout", 408, "Request timeout", None))
315     elif code == 409:
316       return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
317     elif (code == 419):
318       return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None))
319     elif code == 429:
320       return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
321     elif code == 507:
322       return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
323     elif code == 503:
324       return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
325     else:
326       return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))