2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask, request, Response
20 from time import sleep
22 from datetime import datetime
27 from threading import RLock
28 from hashlib import md5
30 # Disable all logging of GET on reading counters and db
31 class AjaxFilter(logging.Filter):
32 def filter(self, record):
33 return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
35 log = logging.getLogger('werkzeug')
36 log.addFilter(AjaxFilter())
42 # list of callback messages
51 cntr_batch_callbacks=0
56 # Request and response constants
57 CALLBACK_URL="/callbacks/<string:id>"
58 CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
59 CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text
60 APP_READ_URL="/get-event/<string:id>"
61 APP_READ_ALL_URL="/get-all-events/<string:id>"
63 NULL_URL="/callbacks-null" # Url for ignored callback. Callbacks are not checked, counted or stored
65 MIME_TEXT="text/plain"
66 MIME_JSON="application/json"
67 CAUGHT_EXCEPTION="Caught exception: "
68 SERVER_ERROR="Server error :"
69 TIME_STAMP="cr-timestamp"
72 forced_settings['delay']=None
75 # Remote host lookup and print host name
76 def remote_host_logging(request):
78 if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
79 host_ip=str(request.environ['REMOTE_ADDR'])
81 host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
83 if (host_ip.startswith('::ffff:')):
84 host_ip=host_ip[len(prefix):]
86 name, alias, addresslist = socket.gethostbyaddr(host_ip)
87 print("Calling host: "+str(name))
90 print("Calling host not possible to retrieve IP: "+str(host_ip))
91 hosts_set.add(host_ip)
100 ### Callback interface, for control
102 # Fetch the oldest callback message for an id
103 # URI and parameter, (GET): /get-event/<id>
104 # response: message + 200 or just 204 or just 500(error)
105 @app.route(APP_READ_URL,
107 def receiveresponse(id):
109 global cntr_msg_fetched
113 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
115 cntr_callbacks[id][1]+=1
116 msg=msg_callbacks[id][0]
117 print("Fetching msg for id: "+id+", msg="+str(msg))
119 if (isinstance(msg,dict)):
121 if ("md5" in msg.keys()):
122 print("EXTRACTED MD5")
124 print("MD5: "+str(msg))
126 del msg_callbacks[id][0]
127 return json.dumps(msg),200
128 print("No messages for id: "+id)
129 except Exception as e:
130 print(CAUGHT_EXCEPTION+str(e))
131 traceback.print_exc()
136 # Fetch all callback message for an id in an array
137 # URI and parameter, (GET): /get-all-events/<id>
138 # response: message + 200 or just 500(error)
139 @app.route(APP_READ_ALL_URL,
141 def receiveresponse_all(id):
143 global cntr_msg_fetched
147 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
148 cntr_msg_fetched+=len(msg_callbacks[id])
149 cntr_callbacks[id][1]+=len(msg_callbacks[id])
150 msg=msg_callbacks[id]
151 print("Fetching all msgs for id: "+id+", msg="+str(msg))
153 if (isinstance(sub_msg, dict)):
154 del sub_msg[TIME_STAMP]
155 del msg_callbacks[id]
156 return json.dumps(msg),200
157 print("No messages for id: "+id)
158 except Exception as e:
159 print(CAUGHT_EXCEPTION+str(e))
160 traceback.print_exc()
164 return json.dumps(msg),200
166 # Receive a callback message
167 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
168 # response: OK 200 or 500 for other errors
169 @app.route(CALLBACK_URL,
170 methods=['PUT','POST'])
171 def events_write(id):
173 global cntr_msg_callbacks
176 print("Received callback for id: "+id +", content-type="+request.content_type)
177 remote_host_logging(request)
178 print("raw data: str(request.data): "+str(request.data))
181 if (request.content_type == MIME_JSON):
183 msg = json.loads(data)
184 print("Payload(json): "+str(msg))
187 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
188 except Exception as e:
190 print("(Exception) Payload does not contain any json, setting empty json as payload")
191 traceback.print_exc()
194 cntr_msg_callbacks += 1
195 if (isinstance(msg, dict)):
196 msg[TIME_STAMP]=str(datetime.now())
197 if (id in msg_callbacks.keys()):
198 msg_callbacks[id].append(msg)
201 msg_callbacks[id].append(msg)
203 if (id in cntr_callbacks.keys()):
204 cntr_callbacks[id][0] += 1
205 cntr_callbacks[id][2] += 1
207 cntr_callbacks[id]=[]
208 cntr_callbacks[id].append(1)
209 cntr_callbacks[id].append(0)
210 cntr_callbacks[id].append(0)
212 except Exception as e:
213 print(CAUGHT_EXCEPTION+str(e))
214 traceback.print_exc()
220 # Receive a json callback message with payload formatted according to output from the message router
221 # Array of stringified json objects
222 # URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages>
223 # json is a list of string encoded json items
224 # response: OK 200 or 500 for other errors
225 @app.route(CALLBACK_MR_URL,
226 methods=['PUT','POST'])
227 def events_write_mr(id):
229 global cntr_msg_callbacks
230 global cntr_batch_callbacks
232 storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
233 #Large payloads will otherwise overload the server
235 print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
236 print("raw data: str(request.data): "+str(request.data))
237 if (storeas is None):
238 print("raw data: str(request.data): "+str(request.data))
242 #if (request.content_type == MIME_JSON):
243 if (MIME_JSON in request.content_type):
245 msg_list = json.loads(data)
246 if (storeas is None):
247 print("Payload(json): "+str(msg_list))
251 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
252 except Exception as e:
254 print("(Exception) Payload does not contain any json, setting empty json as payload")
255 traceback.print_exc()
258 remote_host_logging(request)
260 cntr_batch_callbacks += 1
262 if (storeas is None):
265 #Convert to compact json without ws between parameter and value...
266 #It seem that ws is added somewhere along to way to this server
268 msg=json.dumps(msg, separators=(',', ':'))
271 md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
273 print("msg (json converted to md5 hash): "+str(msg["md5"]))
274 cntr_msg_callbacks += 1
275 if (isinstance(msg, dict)):
276 msg[TIME_STAMP]=str(datetime.now())
277 if (id in msg_callbacks.keys()):
278 msg_callbacks[id].append(msg)
281 msg_callbacks[id].append(msg)
283 if (id in cntr_callbacks.keys()):
284 cntr_callbacks[id][0] += 1
286 cntr_callbacks[id]=[]
287 cntr_callbacks[id].append(1)
288 cntr_callbacks[id].append(0)
289 cntr_callbacks[id].append(0)
290 if (id in msg_callbacks.keys() and list_data):
291 cntr_callbacks[id][2] += 1
293 except Exception as e:
294 print(CAUGHT_EXCEPTION+str(e))
295 traceback.print_exc()
300 # Receive a callback message of a single text message (content type ignored)
301 # or a json array of strings (content type json)
302 # URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
303 # response: OK 200 or 500 for other errors
304 @app.route(CALLBACK_TEXT_URL,
305 methods=['PUT','POST'])
306 def events_write_text(id):
308 global cntr_msg_callbacks
309 global cntr_batch_callbacks
311 storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
312 #Large payloads will otherwise overload the server
314 print("Received callback for id: "+id +", content-type="+request.content_type)
315 remote_host_logging(request)
316 if (storeas is None):
317 print("raw data: str(request.data): "+str(request.data))
323 if (MIME_JSON in request.content_type): #Json array of strings
324 msg_list=json.loads(request.data)
327 data=request.data.decode("utf-8") #Assuming string
329 msg_list.append(data)
331 cntr_batch_callbacks += 1
333 if (storeas == "md5"):
335 print("msg: "+str(msg))
336 print("msg (endcode str): "+str(msg.encode('utf-8')))
337 md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
339 print("msg (data converted to md5 hash): "+str(msg["md5"]))
341 if (isinstance(msg, dict)):
342 msg[TIME_STAMP]=str(datetime.now())
344 cntr_msg_callbacks += 1
345 if (id in msg_callbacks.keys()):
346 msg_callbacks[id].append(msg)
349 msg_callbacks[id].append(msg)
351 if (id in cntr_callbacks.keys()):
352 cntr_callbacks[id][0] += 1
354 cntr_callbacks[id]=[]
355 cntr_callbacks[id].append(1)
356 cntr_callbacks[id].append(0)
357 cntr_callbacks[id].append(0)
358 if (id in cntr_callbacks.keys() and list_data):
359 cntr_callbacks[id][2] += 1
360 except Exception as e:
361 print(CAUGHT_EXCEPTION+str(e))
362 traceback.print_exc()
366 except Exception as e:
367 print(CAUGHT_EXCEPTION+str(e))
368 traceback.print_exc()
373 # Receive a callback message but ignore contents and return 200
374 # URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
377 methods=['PUT','POST'])
381 # Dump the whole db of current callbacks
382 # URI and parameter, (GET): /db
383 # response: message + 200
384 @app.route(DUMP_ALL_URL,
387 return json.dumps(msg_callbacks),200
389 ### Functions for metrics read out ###
391 @app.route('/counter/received_callbacks',
393 def requests_submitted():
394 req_id = request.args.get('id')
396 return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
398 if (req_id in cntr_callbacks.keys()):
399 return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
401 return Response(str("0"), status=200, mimetype=MIME_TEXT)
403 @app.route('/counter/received_callback_batches',
405 def batches_submitted():
406 req_id = request.args.get('id')
408 return Response(str(cntr_batch_callbacks), status=200, mimetype=MIME_TEXT)
410 if (req_id in cntr_callbacks.keys()):
411 return Response(str(cntr_callbacks[req_id][2]), status=200, mimetype=MIME_TEXT)
413 return Response(str("0"), status=200, mimetype=MIME_TEXT)
415 @app.route('/counter/fetched_callbacks',
417 def requests_fetched():
418 req_id = request.args.get('id')
420 return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
422 if (req_id in cntr_callbacks.keys()):
423 return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
425 return Response(str("0"), status=200, mimetype=MIME_TEXT)
427 @app.route('/counter/current_messages',
429 def current_messages():
430 req_id = request.args.get('id')
432 return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
434 if (req_id in cntr_callbacks.keys()):
435 return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
437 return Response(str("0"), status=200, mimetype=MIME_TEXT)
439 @app.route('/counter/remote_hosts',
444 hosts=",".join(hosts_set)
445 return Response(str(hosts), status=200, mimetype=MIME_TEXT)
448 #Set force delay response, in seconds, for all callbacks
449 #/froceesponse?delay=<seconds>
450 @app.route('/forcedelay', methods=['POST'])
454 forced_settings['delay']=int(request.args.get('delay'))
456 forced_settings['delay']=None
457 return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
459 # Helper: Delay if delayed response code is set
461 if (forced_settings['delay'] is not None):
463 val=int(forced_settings['delay'])
465 return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
466 print("Delaying "+str(val)+ " sec.")
469 return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
472 # Reset all messsages and counters
474 methods=['GET', 'POST', 'PUT'])
477 global cntr_msg_fetched
478 global cntr_msg_callbacks
479 global cntr_batch_callbacks
480 global cntr_callbacks
481 global forced_settings
487 cntr_batch_callbacks=0
489 forced_settings['delay']=None
491 return Response('OK', status=200, mimetype=MIME_TEXT)
493 ### Main function ###
495 if __name__ == "__main__":
496 app.run(port=HOST_PORT, host=HOST_IP)