2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask, request, Response
20 from time import sleep
22 from datetime import datetime
27 from threading import RLock
28 from hashlib import md5
30 # Disable all logging of GET on reading counters and db
31 class AjaxFilter(logging.Filter):
32 def filter(self, record):
33 return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
35 log = logging.getLogger('werkzeug')
36 log.addFilter(AjaxFilter())
42 # list of callback messages
55 # Request and response constants
56 CALLBACK_URL="/callbacks/<string:id>"
57 CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
58 CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text
59 APP_READ_URL="/get-event/<string:id>"
60 APP_READ_ALL_URL="/get-all-events/<string:id>"
63 MIME_TEXT="text/plain"
64 MIME_JSON="application/json"
65 CAUGHT_EXCEPTION="Caught exception: "
66 SERVER_ERROR="Server error :"
67 TIME_STAMP="cr-timestamp"
70 forced_settings['delay']=None
73 # Remote host lookup and print host name
74 def remote_host_logging(request):
76 if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
77 host_ip=str(request.environ['REMOTE_ADDR'])
79 host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
81 if (host_ip.startswith('::ffff:')):
82 host_ip=host_ip[len(prefix):]
84 name, alias, addresslist = socket.gethostbyaddr(host_ip)
85 print("Calling host: "+str(name))
88 print("Calling host not possible to retrieve IP: "+str(host_ip))
89 hosts_set.add(host_ip)
98 ### Callback interface, for control
100 # Fetch the oldest callback message for an id
101 # URI and parameter, (GET): /get-event/<id>
102 # response: message + 200 or just 204 or just 500(error)
103 @app.route(APP_READ_URL,
105 def receiveresponse(id):
107 global cntr_msg_fetched
111 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
113 cntr_callbacks[id][1]+=1
114 msg=msg_callbacks[id][0]
115 print("Fetching msg for id: "+id+", msg="+str(msg))
117 if (isinstance(msg,dict)):
119 if ("md5" in msg.keys()):
120 print("EXTRACTED MD5")
122 print("MD5: "+str(msg))
124 del msg_callbacks[id][0]
125 return json.dumps(msg),200
126 print("No messages for id: "+id)
127 except Exception as e:
128 print(CAUGHT_EXCEPTION+str(e))
129 traceback.print_exc()
134 # Fetch all callback message for an id in an array
135 # URI and parameter, (GET): /get-all-events/<id>
136 # response: message + 200 or just 500(error)
137 @app.route(APP_READ_ALL_URL,
139 def receiveresponse_all(id):
141 global cntr_msg_fetched
145 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
146 cntr_msg_fetched+=len(msg_callbacks[id])
147 cntr_callbacks[id][1]+=len(msg_callbacks[id])
148 msg=msg_callbacks[id]
149 print("Fetching all msgs for id: "+id+", msg="+str(msg))
151 if (isinstance(sub_msg, dict)):
152 del sub_msg[TIME_STAMP]
153 del msg_callbacks[id]
154 return json.dumps(msg),200
155 print("No messages for id: "+id)
156 except Exception as e:
157 print(CAUGHT_EXCEPTION+str(e))
158 traceback.print_exc()
162 return json.dumps(msg),200
164 # Receive a callback message
165 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
166 # response: OK 200 or 500 for other errors
167 @app.route(CALLBACK_URL,
168 methods=['PUT','POST'])
169 def events_write(id):
171 global cntr_msg_callbacks
174 print("Received callback for id: "+id +", content-type="+request.content_type)
175 remote_host_logging(request)
176 print("raw data: str(request.data): "+str(request.data))
179 if (request.content_type == MIME_JSON):
181 msg = json.loads(data)
182 print("Payload(json): "+str(msg))
185 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
186 except Exception as e:
188 print("(Exception) Payload does not contain any json, setting empty json as payload")
189 traceback.print_exc()
192 cntr_msg_callbacks += 1
193 if (isinstance(msg, dict)):
194 msg[TIME_STAMP]=str(datetime.now())
195 if (id in msg_callbacks.keys()):
196 msg_callbacks[id].append(msg)
199 msg_callbacks[id].append(msg)
201 if (id in cntr_callbacks.keys()):
202 cntr_callbacks[id][0] += 1
204 cntr_callbacks[id]=[]
205 cntr_callbacks[id].append(1)
206 cntr_callbacks[id].append(0)
208 except Exception as e:
209 print(CAUGHT_EXCEPTION+str(e))
210 traceback.print_exc()
216 # Receive a json callback message with payload formatted according to output from the message router
217 # Array of stringified json objects
218 # URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages>
219 # json is a list of string encoded json items
220 # response: OK 200 or 500 for other errors
221 @app.route(CALLBACK_MR_URL,
222 methods=['PUT','POST'])
223 def events_write_mr(id):
225 global cntr_msg_callbacks
227 storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
228 #Large payloads will otherwise overload the server
230 print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
231 print("raw data: str(request.data): "+str(request.data))
232 if (storeas is None):
233 print("raw data: str(request.data): "+str(request.data))
236 #if (request.content_type == MIME_JSON):
237 if (MIME_JSON in request.content_type):
239 msg_list = json.loads(data)
240 if (storeas is None):
241 print("Payload(json): "+str(msg_list))
244 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
245 except Exception as e:
247 print("(Exception) Payload does not contain any json, setting empty json as payload")
248 traceback.print_exc()
251 remote_host_logging(request)
253 if (storeas is None):
256 #Convert to compact json without ws between parameter and value...
257 #It seem that ws is added somewhere along to way to this server
259 msg=json.dumps(msg, separators=(',', ':'))
262 md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
264 print("msg (json converted to md5 hash): "+str(msg["md5"]))
265 cntr_msg_callbacks += 1
266 if (isinstance(msg, dict)):
267 msg[TIME_STAMP]=str(datetime.now())
268 if (id in msg_callbacks.keys()):
269 msg_callbacks[id].append(msg)
272 msg_callbacks[id].append(msg)
274 if (id in cntr_callbacks.keys()):
275 cntr_callbacks[id][0] += 1
277 cntr_callbacks[id]=[]
278 cntr_callbacks[id].append(1)
279 cntr_callbacks[id].append(0)
281 except Exception as e:
282 print(CAUGHT_EXCEPTION+str(e))
283 traceback.print_exc()
288 # Receive a callback message of a single text message (content type ignored)
289 # or a json array of strings (content type json)
290 # URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
291 # response: OK 200 or 500 for other errors
292 @app.route(CALLBACK_TEXT_URL,
293 methods=['PUT','POST'])
294 def events_write_text(id):
296 global cntr_msg_callbacks
298 storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
299 #Large payloads will otherwise overload the server
301 print("Received callback for id: "+id +", content-type="+request.content_type)
302 remote_host_logging(request)
303 if (storeas is None):
304 print("raw data: str(request.data): "+str(request.data))
309 if (MIME_JSON in request.content_type): #Json array of strings
310 msg_list=json.loads(request.data)
312 data=request.data.decode("utf-8") #Assuming string
314 msg_list.append(data)
317 if (storeas == "md5"):
319 print("msg: "+str(msg))
320 print("msg (endcode str): "+str(msg.encode('utf-8')))
321 md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
323 print("msg (data converted to md5 hash): "+str(msg["md5"]))
325 if (isinstance(msg, dict)):
326 msg[TIME_STAMP]=str(datetime.now())
329 cntr_msg_callbacks += 1
330 if (id in msg_callbacks.keys()):
331 msg_callbacks[id].append(msg)
334 msg_callbacks[id].append(msg)
336 if (id in cntr_callbacks.keys()):
337 cntr_callbacks[id][0] += 1
339 cntr_callbacks[id]=[]
340 cntr_callbacks[id].append(1)
341 cntr_callbacks[id].append(0)
342 except Exception as e:
343 print(CAUGHT_EXCEPTION+str(e))
344 traceback.print_exc()
348 except Exception as e:
349 print(CAUGHT_EXCEPTION+str(e))
350 traceback.print_exc()
355 ### Functions for test ###
357 # Dump the whole db of current callbacks
358 # URI and parameter, (GET): /db
359 # response: message + 200
360 @app.route(DUMP_ALL_URL,
363 return json.dumps(msg_callbacks),200
365 ### Functions for metrics read out ###
367 @app.route('/counter/received_callbacks',
369 def requests_submitted():
370 req_id = request.args.get('id')
372 return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
374 if (req_id in cntr_callbacks.keys()):
375 return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
377 return Response(str("0"), status=200, mimetype=MIME_TEXT)
379 @app.route('/counter/fetched_callbacks',
381 def requests_fetched():
382 req_id = request.args.get('id')
384 return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
386 if (req_id in cntr_callbacks.keys()):
387 return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
389 return Response(str("0"), status=200, mimetype=MIME_TEXT)
391 @app.route('/counter/current_messages',
393 def current_messages():
394 req_id = request.args.get('id')
396 return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
398 if (req_id in cntr_callbacks.keys()):
399 return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
401 return Response(str("0"), status=200, mimetype=MIME_TEXT)
403 @app.route('/counter/remote_hosts',
408 hosts=",".join(hosts_set)
409 return Response(str(hosts), status=200, mimetype=MIME_TEXT)
412 #Set force delay response, in seconds, for all callbacks
413 #/froceesponse?delay=<seconds>
414 @app.route('/forcedelay', methods=['POST'])
418 forced_settings['delay']=int(request.args.get('delay'))
420 forced_settings['delay']=None
421 return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
423 # Helper: Delay if delayed response code is set
425 if (forced_settings['delay'] is not None):
427 val=int(forced_settings['delay'])
429 return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
430 print("Delaying "+str(val)+ " sec.")
433 return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
436 # Reset all messsages and counters
438 methods=['GET', 'POST', 'PUT'])
441 global cntr_msg_fetched
442 global cntr_msg_callbacks
443 global cntr_callbacks
444 global forced_settings
451 forced_settings['delay']=None
453 return Response('OK', status=200, mimetype=MIME_TEXT)
455 ### Main function ###
457 if __name__ == "__main__":
458 app.run(port=HOST_PORT, host=HOST_IP)