X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fcr%2Fapp%2Fcr.py;h=2066e148ec79fb00287b2582e3b564d596759907;hb=7dc96e95bb2354ac4cfd4b2947960437ce20b72e;hp=4b4d8dafe3f2afdb1e30358dd4b605ccc88884c1;hpb=d0c7f9207203ce9a502fc15c09f9938eebfd44f7;p=nonrtric.git diff --git a/test/cr/app/cr.py b/test/cr/app/cr.py index 4b4d8daf..2066e148 100644 --- a/test/cr/app/cr.py +++ b/test/cr/app/cr.py @@ -25,6 +25,7 @@ import traceback import logging import socket from threading import RLock +from hashlib import md5 # Disable all logging of GET on reading counters and db class AjaxFilter(logging.Filter): @@ -47,6 +48,7 @@ HOST_PORT = 2222 # Metrics vars cntr_msg_callbacks=0 +cntr_batch_callbacks=0 cntr_msg_fetched=0 cntr_callbacks={} hosts_set=set() @@ -54,9 +56,11 @@ hosts_set=set() # Request and response constants CALLBACK_URL="/callbacks/" CALLBACK_MR_URL="/callbacks-mr/" #Json list with string encoded items +CALLBACK_TEXT_URL="/callbacks-text/" # Callback for string of text APP_READ_URL="/get-event/" APP_READ_ALL_URL="/get-all-events/" DUMP_ALL_URL="/db" +NULL_URL="/callbacks-null" # Url for ignored callback. Callbacks are not checked, counted or stored MIME_TEXT="text/plain" MIME_JSON="application/json" @@ -111,7 +115,14 @@ def receiveresponse(id): cntr_callbacks[id][1]+=1 msg=msg_callbacks[id][0] print("Fetching msg for id: "+id+", msg="+str(msg)) - del msg[TIME_STAMP] + + if (isinstance(msg,dict)): + del msg[TIME_STAMP] + if ("md5" in msg.keys()): + print("EXTRACTED MD5") + msg=msg["md5"] + print("MD5: "+str(msg)) + del msg_callbacks[id][0] return json.dumps(msg),200 print("No messages for id: "+id) @@ -139,7 +150,8 @@ def receiveresponse_all(id): msg=msg_callbacks[id] print("Fetching all msgs for id: "+id+", msg="+str(msg)) for sub_msg in msg: - del sub_msg[TIME_STAMP] + if (isinstance(sub_msg, dict)): + del sub_msg[TIME_STAMP] del msg_callbacks[id] return json.dumps(msg),200 print("No messages for id: "+id) @@ -180,7 +192,8 @@ def events_write(id): with lock: cntr_msg_callbacks += 1 - msg[TIME_STAMP]=str(datetime.now()) + if (isinstance(msg, dict)): + msg[TIME_STAMP]=str(datetime.now()) if (id in msg_callbacks.keys()): msg_callbacks[id].append(msg) else: @@ -189,10 +202,12 @@ def events_write(id): if (id in cntr_callbacks.keys()): cntr_callbacks[id][0] += 1 + cntr_callbacks[id][2] += 1 else: cntr_callbacks[id]=[] cntr_callbacks[id].append(1) cntr_callbacks[id].append(0) + cntr_callbacks[id].append(0) except Exception as e: print(CAUGHT_EXCEPTION+str(e)) @@ -202,8 +217,9 @@ def events_write(id): return 'OK',200 -# Receive a json callback message with payload fromatted accoirding to output frm the message router -# URI and payload, (PUT or POST): /callbacks/ +# Receive a json callback message with payload formatted according to output from the message router +# Array of stringified json objects +# URI and payload, (PUT or POST): /callbacks-mr/ # json is a list of string encoded json items # response: OK 200 or 500 for other errors @app.route(CALLBACK_MR_URL, @@ -211,18 +227,25 @@ def events_write(id): def events_write_mr(id): global msg_callbacks global cntr_msg_callbacks + global cntr_batch_callbacks + storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload + #Large payloads will otherwise overload the server try: print("Received callback (mr) for id: "+id +", content-type="+request.content_type) - remote_host_logging(request) print("raw data: str(request.data): "+str(request.data)) + if (storeas is None): + print("raw data: str(request.data): "+str(request.data)) do_delay() + list_data=False try: #if (request.content_type == MIME_JSON): if (MIME_JSON in request.content_type): data = request.data msg_list = json.loads(data) - print("Payload(json): "+str(msg_list)) + if (storeas is None): + print("Payload(json): "+str(msg_list)) + list_data=True else: msg_list=[] print("Payload(content-type="+request.content_type+"). Setting empty json as payload") @@ -233,12 +256,24 @@ def events_write_mr(id): with lock: remote_host_logging(request) + if (list_data): + cntr_batch_callbacks += 1 for msg in msg_list: - print("msg (str): "+str(msg)) - msg=json.loads(msg) - print("msg (json): "+str(msg)) + if (storeas is None): + msg=json.loads(msg) + else: + #Convert to compact json without ws between parameter and value... + #It seem that ws is added somewhere along to way to this server + msg=json.loads(msg) + msg=json.dumps(msg, separators=(',', ':')) + + md5msg={} + md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest() + msg=md5msg + print("msg (json converted to md5 hash): "+str(msg["md5"])) cntr_msg_callbacks += 1 - msg[TIME_STAMP]=str(datetime.now()) + if (isinstance(msg, dict)): + msg[TIME_STAMP]=str(datetime.now()) if (id in msg_callbacks.keys()): msg_callbacks[id].append(msg) else: @@ -251,6 +286,82 @@ def events_write_mr(id): cntr_callbacks[id]=[] cntr_callbacks[id].append(1) cntr_callbacks[id].append(0) + cntr_callbacks[id].append(0) + if (id in msg_callbacks.keys() and list_data): + cntr_callbacks[id][2] += 1 + + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return 'NOTOK',500 + + return 'OK',200 + +# Receive a callback message of a single text message (content type ignored) +# or a json array of strings (content type json) +# URI and payload, (PUT or POST): /callbacks-text/ +# response: OK 200 or 500 for other errors +@app.route(CALLBACK_TEXT_URL, + methods=['PUT','POST']) +def events_write_text(id): + global msg_callbacks + global cntr_msg_callbacks + global cntr_batch_callbacks + + storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload + #Large payloads will otherwise overload the server + try: + print("Received callback for id: "+id +", content-type="+request.content_type) + remote_host_logging(request) + if (storeas is None): + print("raw data: str(request.data): "+str(request.data)) + do_delay() + + try: + msg_list=None + list_data=False + if (MIME_JSON in request.content_type): #Json array of strings + msg_list=json.loads(request.data) + list_data=True + else: + data=request.data.decode("utf-8") #Assuming string + msg_list=[] + msg_list.append(data) + with lock: + cntr_batch_callbacks += 1 + for msg in msg_list: + if (storeas == "md5"): + md5msg={} + print("msg: "+str(msg)) + print("msg (endcode str): "+str(msg.encode('utf-8'))) + md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest() + msg=md5msg + print("msg (data converted to md5 hash): "+str(msg["md5"])) + + if (isinstance(msg, dict)): + msg[TIME_STAMP]=str(datetime.now()) + + cntr_msg_callbacks += 1 + if (id in msg_callbacks.keys()): + msg_callbacks[id].append(msg) + else: + msg_callbacks[id]=[] + msg_callbacks[id].append(msg) + + if (id in cntr_callbacks.keys()): + cntr_callbacks[id][0] += 1 + else: + cntr_callbacks[id]=[] + cntr_callbacks[id].append(1) + cntr_callbacks[id].append(0) + cntr_callbacks[id].append(0) + if (id in cntr_callbacks.keys() and list_data): + cntr_callbacks[id][2] += 1 + except Exception as e: + print(CAUGHT_EXCEPTION+str(e)) + traceback.print_exc() + return 'NOTOK',500 + except Exception as e: print(CAUGHT_EXCEPTION+str(e)) @@ -259,7 +370,13 @@ def events_write_mr(id): return 'OK',200 -### Functions for test ### +# Receive a callback message but ignore contents and return 200 +# URI and payload, (PUT or POST): /callbacks-text/ +# response: OK 200 +@app.route(NULL_URL, + methods=['PUT','POST']) +def null_url(id): + return 'OK',200 # Dump the whole db of current callbacks # URI and parameter, (GET): /db @@ -283,6 +400,18 @@ def requests_submitted(): else: return Response(str("0"), status=200, mimetype=MIME_TEXT) +@app.route('/counter/received_callback_batches', + methods=['GET']) +def batches_submitted(): + req_id = request.args.get('id') + if (req_id is None): + return Response(str(cntr_batch_callbacks), status=200, mimetype=MIME_TEXT) + + if (req_id in cntr_callbacks.keys()): + return Response(str(cntr_callbacks[req_id][2]), status=200, mimetype=MIME_TEXT) + else: + return Response(str("0"), status=200, mimetype=MIME_TEXT) + @app.route('/counter/fetched_callbacks', methods=['GET']) def requests_fetched(): @@ -347,6 +476,7 @@ def reset(): global msg_callbacks global cntr_msg_fetched global cntr_msg_callbacks + global cntr_batch_callbacks global cntr_callbacks global forced_settings @@ -354,6 +484,7 @@ def reset(): msg_callbacks={} cntr_msg_fetched=0 cntr_msg_callbacks=0 + cntr_batch_callbacks=0 cntr_callbacks={} forced_settings['delay']=None