from flask import Flask, request, Response
from time import sleep
import time
-import datetime
+from datetime import datetime
import json
import traceback
+import logging
+import socket
+from threading import RLock
+
+# Disable all logging of GET on reading counters and db
+class AjaxFilter(logging.Filter):
+ def filter(self, record):
+ return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
+
+log = logging.getLogger('werkzeug')
+log.addFilter(AjaxFilter())
app = Flask(__name__)
+lock = RLock()
+
# list of callback messages
msg_callbacks={}
cntr_msg_callbacks=0
cntr_msg_fetched=0
cntr_callbacks={}
+hosts_set=set()
# Request and response constants
CALLBACK_URL="/callbacks/<string:id>"
+CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
APP_READ_URL="/get-event/<string:id>"
APP_READ_ALL_URL="/get-all-events/<string:id>"
DUMP_ALL_URL="/db"
MIME_JSON="application/json"
CAUGHT_EXCEPTION="Caught exception: "
SERVER_ERROR="Server error :"
+TIME_STAMP="cr-timestamp"
+
+forced_settings={}
+forced_settings['delay']=None
+
+
+# Remote host lookup and print host name
+def remote_host_logging(request):
+
+ if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
+ host_ip=str(request.environ['REMOTE_ADDR'])
+ else:
+ host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
+ prefix='::ffff:'
+ if (host_ip.startswith('::ffff:')):
+ host_ip=host_ip[len(prefix):]
+ try:
+ name, alias, addresslist = socket.gethostbyaddr(host_ip)
+ print("Calling host: "+str(name))
+ hosts_set.add(name)
+ except Exception:
+ print("Calling host not possible to retrieve IP: "+str(host_ip))
+ hosts_set.add(host_ip)
+
#I'm alive function
@app.route('/',
global msg_callbacks
global cntr_msg_fetched
- try:
- if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
- cntr_msg_fetched+=1
- cntr_callbacks[id][1]+=1
- msg=msg_callbacks[id][0]
- print("Fetching msg for id: "+id+", msg="+str(msg))
- del msg_callbacks[id][0]
- return json.dumps(msg),200
- print("No messages for id: "+id)
- except Exception as e:
- print(CAUGHT_EXCEPTION+str(e))
- traceback.print_exc()
- return "",500
+ with lock:
+ try:
+ if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
+ cntr_msg_fetched+=1
+ cntr_callbacks[id][1]+=1
+ msg=msg_callbacks[id][0]
+ print("Fetching msg for id: "+id+", msg="+str(msg))
+ del msg[TIME_STAMP]
+ del msg_callbacks[id][0]
+ return json.dumps(msg),200
+ print("No messages for id: "+id)
+ except Exception as e:
+ print(CAUGHT_EXCEPTION+str(e))
+ traceback.print_exc()
+ return "",500
- return "",204
+ return "",204
# Fetch all callback message for an id in an array
# URI and parameter, (GET): /get-all-events/<id>
global msg_callbacks
global cntr_msg_fetched
- try:
- if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
- cntr_msg_fetched+=len(msg_callbacks[id])
- cntr_callbacks[id][1]+=len(msg_callbacks[id])
- msg=msg_callbacks[id]
- print("Fetching all msgs for id: "+id+", msg="+str(msg))
- del msg_callbacks[id]
- return json.dumps(msg),200
- print("No messages for id: "+id)
- except Exception as e:
- print(CAUGHT_EXCEPTION+str(e))
- traceback.print_exc()
- return "",500
+ with lock:
+ try:
+ if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
+ cntr_msg_fetched+=len(msg_callbacks[id])
+ cntr_callbacks[id][1]+=len(msg_callbacks[id])
+ msg=msg_callbacks[id]
+ print("Fetching all msgs for id: "+id+", msg="+str(msg))
+ for sub_msg in msg:
+ del sub_msg[TIME_STAMP]
+ del msg_callbacks[id]
+ return json.dumps(msg),200
+ print("No messages for id: "+id)
+ except Exception as e:
+ print(CAUGHT_EXCEPTION+str(e))
+ traceback.print_exc()
+ return "",500
- msg=[]
- return json.dumps(msg),200
+ msg=[]
+ return json.dumps(msg),200
# Receive a callback message
# URI and payload, (PUT or POST): /callbacks/<id> <json messages>
try:
print("Received callback for id: "+id +", content-type="+request.content_type)
+ remote_host_logging(request)
+ print("raw data: str(request.data): "+str(request.data))
+ do_delay()
try:
if (request.content_type == MIME_JSON):
data = request.data
print("(Exception) Payload does not contain any json, setting empty json as payload")
traceback.print_exc()
- cntr_msg_callbacks += 1
- if (id in msg_callbacks.keys()):
- msg_callbacks[id].append(msg)
- else:
- msg_callbacks[id]=[]
- msg_callbacks[id].append(msg)
+ with lock:
+ cntr_msg_callbacks += 1
+ msg[TIME_STAMP]=str(datetime.now())
+ if (id in msg_callbacks.keys()):
+ msg_callbacks[id].append(msg)
+ else:
+ msg_callbacks[id]=[]
+ msg_callbacks[id].append(msg)
+
+ if (id in cntr_callbacks.keys()):
+ cntr_callbacks[id][0] += 1
+ else:
+ cntr_callbacks[id]=[]
+ cntr_callbacks[id].append(1)
+ cntr_callbacks[id].append(0)
+
+ except Exception as e:
+ print(CAUGHT_EXCEPTION+str(e))
+ traceback.print_exc()
+ return 'NOTOK',500
+
+ return 'OK',200
+
+
+# Receive a json callback message with payload fromatted accoirding to output frm the message router
+# URI and payload, (PUT or POST): /callbacks/<id> <json messages>
+# json is a list of string encoded json items
+# response: OK 200 or 500 for other errors
+@app.route(CALLBACK_MR_URL,
+ methods=['PUT','POST'])
+def events_write_mr(id):
+ global msg_callbacks
+ global cntr_msg_callbacks
+
+ try:
+ print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
+ remote_host_logging(request)
+ print("raw data: str(request.data): "+str(request.data))
+ do_delay()
+ try:
+ #if (request.content_type == MIME_JSON):
+ if (MIME_JSON in request.content_type):
+ data = request.data
+ msg_list = json.loads(data)
+ print("Payload(json): "+str(msg_list))
+ else:
+ msg_list=[]
+ print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
+ except Exception as e:
+ msg_list=[]
+ print("(Exception) Payload does not contain any json, setting empty json as payload")
+ traceback.print_exc()
- if (id in cntr_callbacks.keys()):
- cntr_callbacks[id][0] += 1
- else:
- cntr_callbacks[id]=[]
- cntr_callbacks[id].append(1)
- cntr_callbacks[id].append(0)
+ with lock:
+ remote_host_logging(request)
+ for msg in msg_list:
+ print("msg (str): "+str(msg))
+ msg=json.loads(msg)
+ print("msg (json): "+str(msg))
+ cntr_msg_callbacks += 1
+ msg[TIME_STAMP]=str(datetime.now())
+ if (id in msg_callbacks.keys()):
+ msg_callbacks[id].append(msg)
+ else:
+ msg_callbacks[id]=[]
+ msg_callbacks[id].append(msg)
+
+ if (id in cntr_callbacks.keys()):
+ cntr_callbacks[id][0] += 1
+ else:
+ cntr_callbacks[id]=[]
+ cntr_callbacks[id].append(1)
+ cntr_callbacks[id].append(0)
except Exception as e:
print(CAUGHT_EXCEPTION+str(e))
else:
return Response(str("0"), status=200, mimetype=MIME_TEXT)
+@app.route('/counter/remote_hosts',
+ methods=['GET'])
+def remote_hosts():
+ global hosts_set
+
+ hosts=",".join(hosts_set)
+ return Response(str(hosts), status=200, mimetype=MIME_TEXT)
+
+#Set force delay response, in seconds, for all callbacks
+#/froceesponse?delay=<seconds>
+@app.route('/forcedelay', methods=['POST'])
+def forcedelay():
+
+ try:
+ forced_settings['delay']=int(request.args.get('delay'))
+ except Exception:
+ forced_settings['delay']=None
+ return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
+
+# Helper: Delay if delayed response code is set
+def do_delay():
+ if (forced_settings['delay'] is not None):
+ try:
+ val=int(forced_settings['delay'])
+ if (val < 1):
+ return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
+ print("Delaying "+str(val)+ " sec.")
+ time.sleep(val)
+ except Exception:
+ return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
### Admin ###
# Reset all messsages and counters
global msg_callbacks
global cntr_msg_fetched
global cntr_msg_callbacks
+ global cntr_callbacks
+ global forced_settings
- msg_callbacks={}
- cntr_msg_fetched=0
- cntr_msg_callbacks=0
+ with lock:
+ msg_callbacks={}
+ cntr_msg_fetched=0
+ cntr_msg_callbacks=0
+ cntr_callbacks={}
+ forced_settings['delay']=None
- return Response('OK', status=200, mimetype=MIME_TEXT)
+ return Response('OK', status=200, mimetype=MIME_TEXT)
### Main function ###