fix for: consuming from response_topic, differantiate as per message type
[sim/a1-interface.git] / near-rt-ric-simulator / test / KAFKA_DISPATCHER / src / dispatcher.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import os
19 import json
20 import time
21 import math
22
23 from flask import request, Response
24 from datetime import datetime
25 from kafka.consumer.fetcher import ConsumerRecord
26 from kafka import TopicPartition
27 from var_declaration import forced_settings
28 from maincommon import create_kafka_producer, create_kafka_consumer, create_kafka_event, create_kafka_response_event, byte_to_str, get_random_string
29
30
31 MSG_BROKER_URL=os.getenv('MSG_BROKER_URL')
32
33 TIME_OUT=os.getenv('TIME_OUT')
34 publishresponse=os.getenv('PUBLISH_RESP')
35
36 #Constsants
37 APPL_JSON='application/json'
38 TEXT_PLAIN='text/plain'
39 APPL_PROB_JSON='application/problem+json'
40
41 # API Function: Dispatch create or update events to Kafka cluster
42 def put_policy(policyTypeId, policyId):
43
44   if ((r := check_modified_response()) is not None):
45     return r
46
47   policy_type_id = str(policyTypeId)
48   policy_id = str(policyId)
49
50   try:
51     # Error based unit test rel only, for more info please check basic_test_with_cust_header
52     req_id_from_header = request.headers.get('requestid')
53     # Differentiate if the PUT is update or create operation since the endpoint is the same
54     update_oper_from_header = request.headers.get('updateoper')
55     data = request.data
56     data = json.loads(data)
57   except Exception:
58     pjson=create_problem_json(None, "The a1policy is corrupt or missing.", 400, None, policy_id)
59     return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
60
61   # Decide if the operation is update or create
62   if (update_oper_from_header is not None):
63     kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'UPDATE')
64   else:
65     kafka_event = create_kafka_event(policy_type_id, policy_id, data, 'CREATE')
66
67   # Synch callout hooks towards kafka broker
68   if (MSG_BROKER_URL is not None):
69     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
70
71   return Response('', 200, mimetype=TEXT_PLAIN)
72
73
74 # API Function: Dispatch delete events to south Kafka cluster
75 def delete_policy(policyTypeId, policyId):
76
77   if ((r := check_modified_response()) is not None):
78     return r
79
80   policy_type_id = str(policyTypeId)
81   policy_id = str(policyId)
82
83   req_id_from_header = request.headers.get('requestid')
84   print('req_id_from_header', req_id_from_header)
85
86   # Synch callout hooks towards kafka broker
87   kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'DELETE')
88   if (MSG_BROKER_URL is not None):
89     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
90
91   return Response('', 200, mimetype=TEXT_PLAIN)
92
93
94 # API Function: Get status for a policy
95 def get_policy_status(policyTypeId, policyId):
96
97   if ((r := check_modified_response()) is not None):
98     return r
99
100   policy_type_id=str(policyTypeId)
101   policy_id=str(policyId)
102
103   req_id_from_header = request.headers.get('requestid')
104   print('req_id_from_header', req_id_from_header)
105
106   # Synch callout hooks towards kafka broker
107   kafka_event = create_kafka_event(policy_type_id, policy_id, None, 'GET')
108   if (MSG_BROKER_URL is not None):
109     return publish_and_consume(kafka_event, req_id_from_header, policy_type_id)
110
111   return Response('', 200, mimetype=TEXT_PLAIN)
112
113
114 def get_policy_type_to_topic_mapping(policyTypeId):
115
116   if ((r := check_modified_response()) is not None):
117     return r
118
119   policy_type_id = str(policyTypeId)
120
121   m_file = open('../resources/policytype_to_topicmap.json')
122   map_in_dict = json.load(m_file)
123
124   if policy_type_id in map_in_dict.keys():
125     topic_address = map_in_dict[policy_type_id]
126     return Response(json.dumps(topic_address), 200, mimetype=APPL_JSON)
127   else:
128     pjson=create_problem_json(None, "The policy type to topic mapping does not exist.", 404, None, policy_type_id)
129     return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
130
131
132 # Helper: Publishes and consumes (to/from) the target broker and the topic in two-way synch
133 def publish_and_consume(kafka_event, req_id_from_header, pol_type_id):
134
135   # Instantiate KafkaProducer with keyword arguments
136   producer = create_kafka_producer()
137
138   # Assigns an id to each request that is supposed to get a result
139   # if a req_id already exists in req headers, it means that test generated req_id is in use for testing only
140   if (req_id_from_header is None):
141     req_id = get_random_string(16)
142   else:
143     req_id = req_id_from_header
144
145   try:
146
147     resp = get_policy_type_to_topic_mapping(pol_type_id)
148     # if the policy type to topic mapping could not be found, then returns 404
149     # else gets target topic to publish the message to
150     if (resp.status_code == 404):
151       return resp
152     else:
153       data = json.loads(resp.data)
154       target_topic_req = data['request_topic']
155       target_topic_res = data['response_topic']
156
157     # synch-publish
158     # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
159     fut_rec_metadata = producer.send(target_topic_req, kafka_event, req_id)
160     record_metadata = fut_rec_metadata.get()
161     print('Future:', record_metadata)
162     publish_time_in_ms = record_metadata.timestamp
163
164     # For test purposes only triggered from A1 sim
165     # Publish the success response event with no error-info to response topic
166     # It is obvious that non of the requests will have a request id in the header except the test scripts: basic_test and timeout_test
167     if (publishresponse is not None and req_id_from_header is None):
168       kafka_response_event = create_kafka_response_event(200, "")
169       producer.send(target_topic_res, kafka_response_event, req_id)
170
171     # synch-consume
172     consumer_record = consume_record_for(req_id, target_topic_res)
173     if (isinstance(consumer_record, ConsumerRecord)):
174
175       print("Consumer Record:", consumer_record)
176       cons_rec_value = consumer_record.value
177       print('Class for cons_rec_value:', cons_rec_value.__class__)
178       if isinstance(cons_rec_value, str):
179         print('cons_rec_value isinstance str')
180         cons_rec_val_in_dict = json.loads(cons_rec_value) # json.loads: converts str to dict
181         resp_code = cons_rec_val_in_dict['response-code']
182       elif isinstance(cons_rec_value, dict):
183         print('cons_rec_value isinstance dict')
184         resp_code = cons_rec_value['response-code']
185       else:
186         print('cons_rec_value isinstance other')
187         cons_rec_val_in_dict = json.loads(cons_rec_value)
188         resp_code = cons_rec_val_in_dict['response-code']
189
190       # if response code success, then check for time-out
191       if (int(resp_code) == 200):
192         # time-out control block, default time-out duration is thirty seconds
193         consume_time_in_ms = consumer_record.timestamp
194         elapsed_time_in_ms = consume_time_in_ms - publish_time_in_ms
195         print('Elapsed time in ms:', elapsed_time_in_ms)
196         if (elapsed_time_in_ms < int(TIME_OUT) * 1000):
197           return Response('', 200, mimetype=APPL_JSON)
198         else:
199           # returns time-out response code
200           pjson=create_error_response(408)
201           return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
202       else:
203         # for all other responses returns special error of this module by wrapping actual resp code
204         pjson=create_error_response(419)
205         return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
206
207     elif (isinstance(consumer_record, Response)):
208       # Returns time-out response
209       return consumer_record
210     else:
211       # returns special error of this module
212       pjson=create_error_response(419)
213       return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
214
215   except Exception as err:
216     print('Error while publish and consume', err)
217     pjson=create_error_response(419)
218     return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
219   finally:
220     producer.close()
221
222
223 # Helper: Searches for req_id by seeking every five seconds up to thirty seconds
224 # Helper: If the req_id is found, then ConsumerRecord will be returned
225 # Helper: If the req_id is not found, then Response Request Timeout will be returned
226 def consume_record_for(req_id, target_topic_res):
227
228   try:
229     print ('req_id looking for in consumer: ' + target_topic_res, req_id)
230     consumer = create_kafka_consumer()
231     topic_partition = TopicPartition(target_topic_res, 0)
232     consumer.assign([topic_partition])
233
234     sleep_period_in_sec = 5
235     poll_cycle_threshold = calc_pollcycle_threshold(sleep_period_in_sec)
236     poll_retries = 0
237
238     while (poll_retries < poll_cycle_threshold):
239       for consumer_record in consumer:
240         # Get req_id as msg_key and converts it from byte to str for each consumer record
241         msg_key = byte_to_str(consumer_record.key)
242         print ('msg_key in a consumer_record:', msg_key)
243         if (req_id == msg_key):
244           print ('req_id is found in consumer records', req_id)
245           return consumer_record
246
247       print('Sleeping for ' + str(sleep_period_in_sec) + ' seconds...')
248       time.sleep(sleep_period_in_sec)
249       poll_retries += 1
250
251     # Returns time-out response
252     pjson=create_error_response(408)
253     return Response(json.dumps(pjson), 408, mimetype=APPL_PROB_JSON)
254
255   except Exception as err:
256     print('Error while consume record for req_id', err)
257     pjson=create_error_response(419)
258     return Response(json.dumps(pjson), 419, mimetype=APPL_PROB_JSON)
259   finally:
260     consumer.close()
261
262 # Helper: calculates poll cycle threshold
263 def calc_pollcycle_threshold(sleep_period_in_sec):
264
265     poll_cycle_threshold = int(TIME_OUT) / sleep_period_in_sec
266     poll_cycle_threshold = math.floor(poll_cycle_threshold)
267     return poll_cycle_threshold
268
269 # Helper: Create a response object if forced http response code is set
270 def get_forced_response():
271
272   if (forced_settings['code'] is not None):
273     resp_code=forced_settings['code']
274     pjson=create_error_response(int(resp_code))
275     return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
276   return None
277
278
279 # Helper: Delay if delayed response code is set
280 def do_delay():
281
282   if (forced_settings['delay'] is not None):
283     try:
284       val=int(forced_settings['delay'])
285       time.sleep(val)
286     except Exception:
287       return
288
289
290 # Helper: Check if response shall be delayed or a forced response shall be sent
291 def check_modified_response():
292
293   do_delay()
294   return get_forced_response()
295
296
297 # Helper: Create a problem json object
298 def create_problem_json(type_of, title, status, detail, instance):
299
300   error = {}
301   if type_of is not None:
302     error["type"] = type_of
303   if title is not None:
304     error["title"] = title
305   if status is not None:
306     error["status"] = status
307   if detail is not None:
308     error["detail"] = detail
309   if instance is not None:
310     error["instance"] = instance
311   return error
312
313
314 # Helper: Create a problem json based on a generic http response code
315 def create_error_response(code):
316
317     if code == 400:
318       return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
319     elif code == 404:
320       return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
321     elif code == 405:
322       return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
323     elif code == 408:
324       return(create_problem_json(None, "Request timeout", 408, "Request timeout", None))
325     elif code == 409:
326       return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
327     elif (code == 419):
328       return(create_problem_json(None, "Kafka message publish failed", 419, "Publishing the event could not be processed on the Kafka cluster", None))
329     elif code == 429:
330       return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
331     elif code == 507:
332       return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
333     elif code == 503:
334       return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
335     else:
336       return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))