X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fcr%2Fapp%2Fcr.py;h=94ef606d533115a74e304193c82b9df6de12beb2;hb=afe34e22e0be919b8e054826c23c551b4e493f79;hp=fa42c20478863ec4bbc3c99349096f1b9376928f;hpb=f0a158c3c32b6547c839c559548e4e4b729802e1;p=nonrtric.git diff --git a/test/cr/app/cr.py b/test/cr/app/cr.py index fa42c204..94ef606d 100644 --- a/test/cr/app/cr.py +++ b/test/cr/app/cr.py @@ -23,6 +23,9 @@ from datetime import datetime import json import traceback import logging +import socket +from threading import RLock +from hashlib import md5 # Disable all logging of GET on reading counters and db class AjaxFilter(logging.Filter): @@ -34,6 +37,8 @@ log.addFilter(AjaxFilter()) app = Flask(__name__) +lock = RLock() + # list of callback messages msg_callbacks={} @@ -45,9 +50,12 @@ HOST_PORT = 2222 cntr_msg_callbacks=0 cntr_msg_fetched=0 cntr_callbacks={} +hosts_set=set() # Request and response constants CALLBACK_URL="/callbacks/" +CALLBACK_MR_URL="/callbacks-mr/" #Json list with string encoded items +CALLBACK_TEXT_URL="/callbacks-text/" # Callback for string of text APP_READ_URL="/get-event/" APP_READ_ALL_URL="/get-all-events/" DUMP_ALL_URL="/db" @@ -58,6 +66,29 @@ CAUGHT_EXCEPTION="Caught exception: " SERVER_ERROR="Server error :" TIME_STAMP="cr-timestamp" +forced_settings={} +forced_settings['delay']=None + + +# Remote host lookup and print host name +def remote_host_logging(request): + + if request.environ.get('HTTP_X_FORWARDED_FOR') is None: + host_ip=str(request.environ['REMOTE_ADDR']) + else: + host_ip=str(request.environ['HTTP_X_FORWARDED_FOR']) + prefix='::ffff:' + if (host_ip.startswith('::ffff:')): + host_ip=host_ip[len(prefix):] + try: + name, alias, addresslist = socket.gethostbyaddr(host_ip) + print("Calling host: "+str(name)) + hosts_set.add(name) + except Exception: + print("Calling host not possible to retrieve IP: "+str(host_ip)) + hosts_set.add(host_ip) + + #I'm alive function @app.route('/', methods=['GET']) @@ -75,22 +106,30 @@ def receiveresponse(id): global msg_callbacks global cntr_msg_fetched - try: - if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): - cntr_msg_fetched+=1 - cntr_callbacks[id][1]+=1 - msg=msg_callbacks[id][0] - print("Fetching msg for id: "+id+", msg="+str(msg)) - del msg[TIME_STAMP] - del msg_callbacks[id][0] - return json.dumps(msg),200 - print("No messages for id: "+id) - except Exception as e: - print(CAUGHT_EXCEPTION+str(e)) - traceback.print_exc() - return "",500 + with lock: + try: + if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): + cntr_msg_fetched+=1 + cntr_callbacks[id][1]+=1 + msg=msg_callbacks[id][0] + print("Fetching msg for id: "+id+", msg="+str(msg)) + + if (isinstance(msg,dict)): + del msg[TIME_STAMP] + if ("md5" in msg.keys()): + print("EXTRACTED MD5") + msg=msg["md5"] + print("MD5: "+str(msg)) + + del msg_callbacks[id][0] + return json.dumps(msg),200 + print("No messages for id: "+id) + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return "",500 - return "",204 + return "",204 # Fetch all callback message for an id in an array # URI and parameter, (GET): /get-all-events/ @@ -101,24 +140,26 @@ def receiveresponse_all(id): global msg_callbacks global cntr_msg_fetched - try: - if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): - cntr_msg_fetched+=len(msg_callbacks[id]) - cntr_callbacks[id][1]+=len(msg_callbacks[id]) - msg=msg_callbacks[id] - print("Fetching all msgs for id: "+id+", msg="+str(msg)) - for sub_msg in msg: - del sub_msg[TIME_STAMP] - del msg_callbacks[id] - return json.dumps(msg),200 - print("No messages for id: "+id) - except Exception as e: - print(CAUGHT_EXCEPTION+str(e)) - traceback.print_exc() - return "",500 + with lock: + try: + if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): + cntr_msg_fetched+=len(msg_callbacks[id]) + cntr_callbacks[id][1]+=len(msg_callbacks[id]) + msg=msg_callbacks[id] + print("Fetching all msgs for id: "+id+", msg="+str(msg)) + for sub_msg in msg: + if (isinstance(sub_msg, dict)): + del sub_msg[TIME_STAMP] + del msg_callbacks[id] + return json.dumps(msg),200 + print("No messages for id: "+id) + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return "",500 - msg=[] - return json.dumps(msg),200 + msg=[] + return json.dumps(msg),200 # Receive a callback message # URI and payload, (PUT or POST): /callbacks/ @@ -131,6 +172,9 @@ def events_write(id): try: print("Received callback for id: "+id +", content-type="+request.content_type) + remote_host_logging(request) + print("raw data: str(request.data): "+str(request.data)) + do_delay() try: if (request.content_type == MIME_JSON): data = request.data @@ -144,20 +188,162 @@ def events_write(id): print("(Exception) Payload does not contain any json, setting empty json as payload") traceback.print_exc() - cntr_msg_callbacks += 1 - msg[TIME_STAMP]=str(datetime.now()) - if (id in msg_callbacks.keys()): - msg_callbacks[id].append(msg) - else: - msg_callbacks[id]=[] - msg_callbacks[id].append(msg) - - if (id in cntr_callbacks.keys()): - cntr_callbacks[id][0] += 1 - else: - cntr_callbacks[id]=[] - cntr_callbacks[id].append(1) - cntr_callbacks[id].append(0) + with lock: + cntr_msg_callbacks += 1 + if (isinstance(msg, dict)): + msg[TIME_STAMP]=str(datetime.now()) + if (id in msg_callbacks.keys()): + msg_callbacks[id].append(msg) + else: + msg_callbacks[id]=[] + msg_callbacks[id].append(msg) + + if (id in cntr_callbacks.keys()): + cntr_callbacks[id][0] += 1 + else: + cntr_callbacks[id]=[] + cntr_callbacks[id].append(1) + cntr_callbacks[id].append(0) + + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return 'NOTOK',500 + + return 'OK',200 + + +# Receive a json callback message with payload formatted according to output from the message router +# Array of stringified json objects +# URI and payload, (PUT or POST): /callbacks-mr/ +# json is a list of string encoded json items +# response: OK 200 or 500 for other errors +@app.route(CALLBACK_MR_URL, + methods=['PUT','POST']) +def events_write_mr(id): + global msg_callbacks + global cntr_msg_callbacks + + storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload + #Large payloads will otherwise overload the server + try: + print("Received callback (mr) for id: "+id +", content-type="+request.content_type) + print("raw data: str(request.data): "+str(request.data)) + if (storeas is None): + print("raw data: str(request.data): "+str(request.data)) + do_delay() + try: + #if (request.content_type == MIME_JSON): + if (MIME_JSON in request.content_type): + data = request.data + msg_list = json.loads(data) + if (storeas is None): + print("Payload(json): "+str(msg_list)) + else: + msg_list=[] + print("Payload(content-type="+request.content_type+"). Setting empty json as payload") + except Exception as e: + msg_list=[] + print("(Exception) Payload does not contain any json, setting empty json as payload") + traceback.print_exc() + + with lock: + remote_host_logging(request) + for msg in msg_list: + if (storeas is None): + msg=json.loads(msg) + else: + #Convert to compact json without ws between parameter and value... + #It seem that ws is added somewhere along to way to this server + msg=json.loads(msg) + msg=json.dumps(msg, separators=(',', ':')) + + md5msg={} + md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest() + msg=md5msg + print("msg (json converted to md5 hash): "+str(msg["md5"])) + cntr_msg_callbacks += 1 + if (isinstance(msg, dict)): + msg[TIME_STAMP]=str(datetime.now()) + if (id in msg_callbacks.keys()): + msg_callbacks[id].append(msg) + else: + msg_callbacks[id]=[] + msg_callbacks[id].append(msg) + + if (id in cntr_callbacks.keys()): + cntr_callbacks[id][0] += 1 + else: + cntr_callbacks[id]=[] + cntr_callbacks[id].append(1) + cntr_callbacks[id].append(0) + + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return 'NOTOK',500 + + return 'OK',200 + +# Receive a callback message of a single text message (content type ignored) +# or a json array of strings (content type json) +# URI and payload, (PUT or POST): /callbacks-text/ +# response: OK 200 or 500 for other errors +@app.route(CALLBACK_TEXT_URL, + methods=['PUT','POST']) +def events_write_text(id): + global msg_callbacks + global cntr_msg_callbacks + + storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload + #Large payloads will otherwise overload the server + try: + print("Received callback for id: "+id +", content-type="+request.content_type) + remote_host_logging(request) + if (storeas is None): + print("raw data: str(request.data): "+str(request.data)) + do_delay() + + try: + msg_list=None + if (MIME_JSON in request.content_type): #Json array of strings + msg_list=json.loads(request.data) + else: + data=request.data.decode("utf-8") #Assuming string + msg_list=[] + msg_list.append(data) + + for msg in msg_list: + if (storeas == "md5"): + md5msg={} + print("msg: "+str(msg)) + print("msg (endcode str): "+str(msg.encode('utf-8'))) + md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest() + msg=md5msg + print("msg (data converted to md5 hash): "+str(msg["md5"])) + + if (isinstance(msg, dict)): + msg[TIME_STAMP]=str(datetime.now()) + + with lock: + cntr_msg_callbacks += 1 + if (id in msg_callbacks.keys()): + msg_callbacks[id].append(msg) + else: + msg_callbacks[id]=[] + msg_callbacks[id].append(msg) + + if (id in cntr_callbacks.keys()): + cntr_callbacks[id][0] += 1 + else: + cntr_callbacks[id]=[] + cntr_callbacks[id].append(1) + cntr_callbacks[id].append(0) + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return 'NOTOK',500 + except Exception as e: print(CAUGHT_EXCEPTION+str(e)) @@ -214,7 +400,37 @@ def current_messages(): else: return Response(str("0"), status=200, mimetype=MIME_TEXT) +@app.route('/counter/remote_hosts', + methods=['GET']) +def remote_hosts(): + global hosts_set + + hosts=",".join(hosts_set) + return Response(str(hosts), status=200, mimetype=MIME_TEXT) + +#Set force delay response, in seconds, for all callbacks +#/froceesponse?delay= +@app.route('/forcedelay', methods=['POST']) +def forcedelay(): + + try: + forced_settings['delay']=int(request.args.get('delay')) + except Exception: + forced_settings['delay']=None + return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT) + +# Helper: Delay if delayed response code is set +def do_delay(): + if (forced_settings['delay'] is not None): + try: + val=int(forced_settings['delay']) + if (val < 1): + return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT) + print("Delaying "+str(val)+ " sec.") + time.sleep(val) + except Exception: + return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT) ### Admin ### # Reset all messsages and counters @@ -225,13 +441,16 @@ def reset(): global cntr_msg_fetched global cntr_msg_callbacks global cntr_callbacks + global forced_settings - msg_callbacks={} - cntr_msg_fetched=0 - cntr_msg_callbacks=0 - cntr_callbacks={} + with lock: + msg_callbacks={} + cntr_msg_fetched=0 + cntr_msg_callbacks=0 + cntr_callbacks={} + forced_settings['delay']=None - return Response('OK', status=200, mimetype=MIME_TEXT) + return Response('OK', status=200, mimetype=MIME_TEXT) ### Main function ###