# ============LICENSE_START=============================================== # Copyright (C) 2020 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END================================================= # from flask import Flask, request, Response from time import sleep import time from datetime import datetime import json import traceback import logging import socket from threading import RLock # Disable all logging of GET on reading counters and db class AjaxFilter(logging.Filter): def filter(self, record): return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage()) log = logging.getLogger('werkzeug') log.addFilter(AjaxFilter()) app = Flask(__name__) lock = RLock() # list of callback messages msg_callbacks={} # Server info HOST_IP = "::" HOST_PORT = 2222 # Metrics vars cntr_msg_callbacks=0 cntr_msg_fetched=0 cntr_callbacks={} hosts_set=set() # Request and response constants CALLBACK_URL="/callbacks/" CALLBACK_MR_URL="/callbacks-mr/" #Json list with string encoded items APP_READ_URL="/get-event/" APP_READ_ALL_URL="/get-all-events/" DUMP_ALL_URL="/db" MIME_TEXT="text/plain" MIME_JSON="application/json" CAUGHT_EXCEPTION="Caught exception: " SERVER_ERROR="Server error :" TIME_STAMP="cr-timestamp" forced_settings={} forced_settings['delay']=None # Remote host lookup and print host name def remote_host_logging(request): if request.environ.get('HTTP_X_FORWARDED_FOR') is None: host_ip=str(request.environ['REMOTE_ADDR']) else: host_ip=str(request.environ['HTTP_X_FORWARDED_FOR']) prefix='::ffff:' if (host_ip.startswith('::ffff:')): host_ip=host_ip[len(prefix):] try: name, alias, addresslist = socket.gethostbyaddr(host_ip) print("Calling host: "+str(name)) hosts_set.add(name) except Exception: print("Calling host not possible to retrieve IP: "+str(host_ip)) hosts_set.add(host_ip) #I'm alive function @app.route('/', methods=['GET']) def index(): return 'OK', 200 ### Callback interface, for control # Fetch the oldest callback message for an id # URI and parameter, (GET): /get-event/ # response: message + 200 or just 204 or just 500(error) @app.route(APP_READ_URL, methods=['GET']) def receiveresponse(id): global msg_callbacks global cntr_msg_fetched with lock: try: if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): cntr_msg_fetched+=1 cntr_callbacks[id][1]+=1 msg=msg_callbacks[id][0] print("Fetching msg for id: "+id+", msg="+str(msg)) del msg[TIME_STAMP] del msg_callbacks[id][0] return json.dumps(msg),200 print("No messages for id: "+id) except Exception as e: print(CAUGHT_EXCEPTION+str(e)) traceback.print_exc() return "",500 return "",204 # Fetch all callback message for an id in an array # URI and parameter, (GET): /get-all-events/ # response: message + 200 or just 500(error) @app.route(APP_READ_ALL_URL, methods=['GET']) def receiveresponse_all(id): global msg_callbacks global cntr_msg_fetched with lock: try: if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): cntr_msg_fetched+=len(msg_callbacks[id]) cntr_callbacks[id][1]+=len(msg_callbacks[id]) msg=msg_callbacks[id] print("Fetching all msgs for id: "+id+", msg="+str(msg)) for sub_msg in msg: del sub_msg[TIME_STAMP] del msg_callbacks[id] return json.dumps(msg),200 print("No messages for id: "+id) except Exception as e: print(CAUGHT_EXCEPTION+str(e)) traceback.print_exc() return "",500 msg=[] return json.dumps(msg),200 # Receive a callback message # URI and payload, (PUT or POST): /callbacks/ # response: OK 200 or 500 for other errors @app.route(CALLBACK_URL, methods=['PUT','POST']) def events_write(id): global msg_callbacks global cntr_msg_callbacks try: print("Received callback for id: "+id +", content-type="+request.content_type) remote_host_logging(request) print("raw data: str(request.data): "+str(request.data)) do_delay() try: if (request.content_type == MIME_JSON): data = request.data msg = json.loads(data) print("Payload(json): "+str(msg)) else: msg={} print("Payload(content-type="+request.content_type+"). Setting empty json as payload") except Exception as e: msg={} print("(Exception) Payload does not contain any json, setting empty json as payload") traceback.print_exc() with lock: cntr_msg_callbacks += 1 msg[TIME_STAMP]=str(datetime.now()) if (id in msg_callbacks.keys()): msg_callbacks[id].append(msg) else: msg_callbacks[id]=[] msg_callbacks[id].append(msg) if (id in cntr_callbacks.keys()): cntr_callbacks[id][0] += 1 else: cntr_callbacks[id]=[] cntr_callbacks[id].append(1) cntr_callbacks[id].append(0) except Exception as e: print(CAUGHT_EXCEPTION+str(e)) traceback.print_exc() return 'NOTOK',500 return 'OK',200 # Receive a json callback message with payload fromatted accoirding to output frm the message router # URI and payload, (PUT or POST): /callbacks/ # json is a list of string encoded json items # response: OK 200 or 500 for other errors @app.route(CALLBACK_MR_URL, methods=['PUT','POST']) def events_write_mr(id): global msg_callbacks global cntr_msg_callbacks try: print("Received callback (mr) for id: "+id +", content-type="+request.content_type) remote_host_logging(request) print("raw data: str(request.data): "+str(request.data)) do_delay() try: #if (request.content_type == MIME_JSON): if (MIME_JSON in request.content_type): data = request.data msg_list = json.loads(data) print("Payload(json): "+str(msg_list)) else: msg_list=[] print("Payload(content-type="+request.content_type+"). Setting empty json as payload") except Exception as e: msg_list=[] print("(Exception) Payload does not contain any json, setting empty json as payload") traceback.print_exc() with lock: remote_host_logging(request) for msg in msg_list: print("msg (str): "+str(msg)) msg=json.loads(msg) print("msg (json): "+str(msg)) cntr_msg_callbacks += 1 msg[TIME_STAMP]=str(datetime.now()) if (id in msg_callbacks.keys()): msg_callbacks[id].append(msg) else: msg_callbacks[id]=[] msg_callbacks[id].append(msg) if (id in cntr_callbacks.keys()): cntr_callbacks[id][0] += 1 else: cntr_callbacks[id]=[] cntr_callbacks[id].append(1) cntr_callbacks[id].append(0) except Exception as e: print(CAUGHT_EXCEPTION+str(e)) traceback.print_exc() return 'NOTOK',500 return 'OK',200 ### Functions for test ### # Dump the whole db of current callbacks # URI and parameter, (GET): /db # response: message + 200 @app.route(DUMP_ALL_URL, methods=['GET']) def dump_db(): return json.dumps(msg_callbacks),200 ### Functions for metrics read out ### @app.route('/counter/received_callbacks', methods=['GET']) def requests_submitted(): req_id = request.args.get('id') if (req_id is None): return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT) if (req_id in cntr_callbacks.keys()): return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT) else: return Response(str("0"), status=200, mimetype=MIME_TEXT) @app.route('/counter/fetched_callbacks', methods=['GET']) def requests_fetched(): req_id = request.args.get('id') if (req_id is None): return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT) if (req_id in cntr_callbacks.keys()): return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT) else: return Response(str("0"), status=200, mimetype=MIME_TEXT) @app.route('/counter/current_messages', methods=['GET']) def current_messages(): req_id = request.args.get('id') if (req_id is None): return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT) if (req_id in cntr_callbacks.keys()): return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT) else: return Response(str("0"), status=200, mimetype=MIME_TEXT) @app.route('/counter/remote_hosts', methods=['GET']) def remote_hosts(): global hosts_set hosts=",".join(hosts_set) return Response(str(hosts), status=200, mimetype=MIME_TEXT) #Set force delay response, in seconds, for all callbacks #/froceesponse?delay= @app.route('/forcedelay', methods=['POST']) def forcedelay(): try: forced_settings['delay']=int(request.args.get('delay')) except Exception: forced_settings['delay']=None return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT) # Helper: Delay if delayed response code is set def do_delay(): if (forced_settings['delay'] is not None): try: val=int(forced_settings['delay']) if (val < 1): return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT) print("Delaying "+str(val)+ " sec.") time.sleep(val) except Exception: return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT) ### Admin ### # Reset all messsages and counters @app.route('/reset', methods=['GET', 'POST', 'PUT']) def reset(): global msg_callbacks global cntr_msg_fetched global cntr_msg_callbacks global cntr_callbacks global forced_settings with lock: msg_callbacks={} cntr_msg_fetched=0 cntr_msg_callbacks=0 cntr_callbacks={} forced_settings['delay']=None return Response('OK', status=200, mimetype=MIME_TEXT) ### Main function ### if __name__ == "__main__": app.run(port=HOST_PORT, host=HOST_IP)