X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fmrstub%2Fapp%2Fmain.py;h=fb6d67482be6f6aa0fa2b211352b28281ca4110e;hb=663566c28930429775ea9921f0e32ddf5253da28;hp=40707d62ff5935385ecc8052c6d859f53b31b425;hpb=7fbe885e3adfb8257d78ac500dba0b53a14f5d52;p=nonrtric.git diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py index 40707d62..fb6d6748 100644 --- a/test/mrstub/app/main.py +++ b/test/mrstub/app/main.py @@ -43,6 +43,8 @@ lock = RLock() msg_requests=[] msg_responses={} +generic_messages={} + # Server info HOST_IP = "::" HOST_PORT = 2222 @@ -54,6 +56,8 @@ cntr_msg_responses_submitted=0 cntr_msg_responses_fetched=0 # Request and response constants +ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT" +ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/" AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" APP_WRITE_URL="/send-request" @@ -103,7 +107,7 @@ def dmaap_uploader(): sleep(0.01) -# Function to download messages from dmaap +# Function to upload messages to dmaap def dmaap_downloader(): global msg_responses global cntr_msg_responses_submitted @@ -344,6 +348,115 @@ def events_write(): return Response('{}', status=200, mimetype=MIME_JSON) +@app.route(ORU_WRITE_URL, + methods=['PUT','POST']) +def oru_write(): + global msg_requests + msg=json.dumps(request.json) + msg_requests.append(msg) + return Response('{}', status=200, mimetype=MIME_JSON) + +@app.route(ORU_READ_URL, + methods=['GET']) +def oru_read(): + global msg_requests + if(len(msg_requests)>0): + rsp=msg_requests.pop(0) + res=[] + res.append(rsp) + return Response(json.dumps(res), status=200, mimetype=MIME_JSON) + return Response("[]", status=200, mimetype=MIME_JSON) + +# Generic POST/PUT catching all urls starting with /events/. +# Writes the message in a que for that topic +@app.route("/events/", + methods=['PUT','POST']) +def generic_write(path): + global generic_messages + global cntr_msg_responses_submitted + urlkey="/events/"+str(path) + write_method=str(request.method) + with lock: + try: + payload=request.json + print(write_method+" on "+urlkey+" json=" + json.dumps(payload)) + topicmsgs=[] + if (urlkey in generic_messages.keys()): + topicmsgs=generic_messages[urlkey] + else: + generic_messages[urlkey]=topicmsgs + + if isinstance(payload, list): + for listitem in payload: + topicmsgs.append(listitem) + else: + topicmsgs.append(payload) + + cntr_msg_responses_submitted += 1 + except Exception as e: + print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) + + return Response('{}', status=200, mimetype=MIME_JSON) + +# Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array. +# Returns only the messages previously written to the same urls +@app.route("/events/", + methods=['GET']) +def generic_read(path): + global generic_messages + global cntr_msg_requests_fetched + + urlpath="/events/"+str(path) + urlkey="/events/"+str(path).split("/")[0] #Extract topic + print("GET on topic"+urlkey) + limit=request.args.get('limit') + if (limit is None): + limit=4096 + else: + limit=int(limit) + if (limit<0): + limit=0 + if (limit>4096): + limit=4096 + print("Limting number of returned messages to: "+str(limit)) + + timeout=request.args.get('timeout') + if (timeout is None): + timeout=10000 + else: + timeout=min(int(timeout),60000) + + start_time=int(round(time.time() * 1000)) + current_time=int(round(time.time() * 1000)) + topicmsgs=[] + if (urlkey in generic_messages.keys()): + topicmsgs=generic_messages[urlkey] + + while(current_time0): + try: + msgs='' + cntr=0 + while(cntr0): + if (len(msgs)>1): + msgs=msgs+',' + msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0))) + cntr_msg_requests_fetched += 1 + cntr=cntr+1 + msgs='['+msgs+']' + print("GET on "+urlpath+" MSGs: "+msgs) + return Response(msgs, status=200, mimetype=MIME_JSON) + except Exception as e: + print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + sleep(0.025) # sleep 25 milliseconds + current_time=int(round(time.time() * 1000)) + + print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time)) + return Response("[]", status=200, mimetype=MIME_JSON) + ### Functions for metrics read out ###