X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fmrstub%2Fapp%2Fmain.py;h=09954286bb74dbed4bf697fc24af00a540c3c30c;hb=refs%2Fchanges%2F83%2F8083%2F3;hp=8804a0448abed9cd3dd783476f7b8b5830b23f6a;hpb=bee5b1771393f08c55011f3ed3efc924a5878e70;p=nonrtric.git diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py index 8804a044..09954286 100644 --- a/test/mrstub/app/main.py +++ b/test/mrstub/app/main.py @@ -24,7 +24,18 @@ import json from flask import Flask from flask import Response import traceback -from threading import RLock +from threading import RLock, Thread +import logging +import os +import requests + +# Disable all logging of GET on reading counters +class AjaxFilter(logging.Filter): + def filter(self, record): + return ("/counter/" not in record.getMessage()) + +log = logging.getLogger('werkzeug') +log.addFilter(AjaxFilter()) app = Flask(__name__) lock = RLock() @@ -32,6 +43,8 @@ lock = RLock() msg_requests=[] msg_responses={} +generic_messages={} + # Server info HOST_IP = "::" HOST_PORT = 2222 @@ -43,8 +56,10 @@ cntr_msg_responses_submitted=0 cntr_msg_responses_fetched=0 # Request and response constants -AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" -AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" +ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT" +ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/" +A1PMS_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" +A1PMS_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" APP_WRITE_URL="/send-request" APP_READ_URL="/receive-response" MIME_TEXT="text/plain" @@ -52,6 +67,134 @@ MIME_JSON="application/json" CAUGHT_EXCEPTION="Caught exception: " SERVER_ERROR="Server error :" +topic_write="" +topic_read="" +generic_topics_upload_baseurl="" + +uploader_thread=None +downloader_thread=None +generic_uploader_thread=None + +# Function to upload A1PMS messages to dmaap +def dmaap_uploader(): + global msg_requests + global cntr_msg_requests_fetched + + print("Starting uploader") + + headers = {'Content-type': 'application/json', 'Accept': '*/*'} + #url="http://"+topic_host+"/events/"+topic_read + url=topic_read + + while True: + while (len(msg_requests)>0): + msg=msg_requests[0] + if msg is not None: + try: + print("Sending to dmaap : "+ url) + print("Sending to dmaap : "+ msg) + resp=requests.post(url, data=msg, headers=headers, timeout=10) + if (resp.status_code<199 & resp.status_code > 299): + print("Failed, response code: " + str(resp.status_code)) + sleep(1) + else: + print("Dmaap response code: " + str(resp.status_code)) + print("Dmaap response text: " + str(resp.text)) + with lock: + msg_requests.pop(0) + cntr_msg_requests_fetched += 1 + except Exception as e: + print("Failed, exception: "+ str(e)) + sleep(1) + sleep(0.01) + + +# Function to download A1PMS messages from dmaap +def dmaap_downloader(): + global msg_responses + global cntr_msg_responses_submitted + + print("Starting uploader") + + while True: + + try : + #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100" + url=topic_write + headers = {'Accept': 'application/json'} + print("Reading from dmaap: " + url) + resp=requests.get(url, headers=headers) + if (resp.status_code<199 & resp.status_code > 299): + print("Failed, response code: " + resp.status_code) + sleep(1) + else: + print("Recieved data from dmaap mr") + try: + data=resp.json() + print("Recieved data (raw): " + str(resp.text)) + if isinstance(data, list): + for item in data: + item=json.loads(item) + corrid=str(item["correlationId"]) + status=str(item["status"]) + msg=str(item["message"]) + item_str=msg+status[0:3] + with lock: + msg_responses[corrid]=item_str + cntr_msg_responses_submitted += 1 + else: + print("Data from dmaap is not json array: " + str(resp.text)) + sleep(1) + except Exception as e: + print("Corrupt data from dmaap mr - dropping " + str(data)) + print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc()) + sleep(1) + except Exception as e: + sleep(1) + +# Function to upload generic messages to dmaap +def dmaap_generic_uploader(): + global msg_requests + global cntr_msg_requests_fetched + + print("Starting generic uploader") + + headers_json = {'Content-type': 'application/json', 'Accept': '*/*'} + headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'} + + while True: + if (len(generic_messages)): + keys_copy = list(generic_messages.keys()) + for topicname in keys_copy: #topicname contains the path of the topics, eg. "/event/" + topic_queue=generic_messages[topicname] + if (len(topic_queue)>0): + if (topicname.endswith(".text")): + msg=topic_queue[0] + headers=headers_text + else: + msg=topic_queue[0] + msg=json.dumps(msg) + headers=headers_json + url=generic_topics_upload_baseurl+topicname + print("Sending to dmaap : "+ url) + print("Sending to dmaap : "+ msg) + print("Sending to dmaap : "+ str(headers)) + try: + resp=requests.post(url, data=msg, headers=headers, timeout=10) + if (resp.status_code<199 & resp.status_code > 299): + print("Failed, response code: " + str(resp.status_code)) + sleep(1) + else: + print("Dmaap response code: " + str(resp.status_code)) + print("Dmaap response text: " + str(resp.text)) + with lock: + topic_queue.pop(0) + cntr_msg_requests_fetched += 1 + except Exception as e: + print("Failed, exception: "+ str(e)) + sleep(1) + sleep(0.01) + #I'm alive function @app.route('/', methods=['GET']) @@ -59,7 +202,7 @@ def index(): return 'OK', 200 -# Helper function to create a Dmaap request message +# Helper function to create a Dmaap A1PMS request message # args : # response: json formatted string of a complete Dmaap message def create_message(operation, correlation_id, payload, url): @@ -73,8 +216,8 @@ def create_message(operation, correlation_id, payload, url): ### MR-stub interface, for MR control -# Send a message to MR -# URI and parameters (GET): /send-request?operation=&url= +# Send a A1PMS message to MR +# URI and parameters (PUT or POST): /send-request?operation=&url= # response: (http 200) o4 400 for parameter error or 500 for other errors @app.route(APP_WRITE_URL, methods=['PUT','POST']) @@ -101,7 +244,7 @@ def sendrequest(): print(APP_WRITE_URL+" operation="+oper+" url="+url) correlation_id=str(time.time_ns()) payload=None - if (oper == "PUT") and (request.json is not None): + if (oper == "PUT") and len(request.data) > 0: payload=json.dumps(request.json) msg=create_message(oper, correlation_id, payload, url) @@ -114,7 +257,7 @@ def sendrequest(): print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) -# Receive a message response for MR for the included correlation id +# Receive a A1PMS message response for MR for the included correlation id # URI and parameter, (GET): /receive-response?correlationid= # response: 200 or empty 204 or other errors 500 @app.route(APP_READ_URL, @@ -125,19 +268,19 @@ def receiveresponse(): with lock: print("APP_READ_URL lock") try: - id=request.args.get('correlationid') - if (id is None): + cid=request.args.get('correlationid') + if (cid is None): print(APP_READ_URL+" parameter 'correclationid' missing") return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT) - if (id in msg_responses): - answer=msg_responses[id] - del msg_responses[id] - print(APP_READ_URL+" response (correlationid="+id+"): " + answer) + if (cid in msg_responses): + answer=msg_responses[cid] + del msg_responses[cid] + print(APP_READ_URL+" response (correlationid="+cid+"): " + answer) cntr_msg_responses_fetched += 1 return Response(answer, status=200, mimetype=MIME_JSON) - print(APP_READ_URL+" - no messages (correlationid="+id+"): ") + print(APP_READ_URL+" - no messages (correlationid="+cid+"): ") return Response('', status=204, mimetype=MIME_JSON) except Exception as e: print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) @@ -145,15 +288,18 @@ def receiveresponse(): ### Dmaap interface ### -# Read messages stream. URI according to agent configuration. +# Read A1PMS messages stream. URI according to agent configuration. # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent # response: 200 , or 500 for other errors -@app.route(AGENT_READ_URL, +@app.route(A1PMS_READ_URL, methods=['GET']) def events_read(): global msg_requests global cntr_msg_requests_fetched + if topic_write or topic_read: + return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) + limit=request.args.get('limit') if (limit is None): limit=4096 @@ -171,10 +317,10 @@ def events_read(): else: timeout=min(int(timeout),60000) - startTime=int(round(time.time() * 1000)) - currentTime=int(round(time.time() * 1000)) + start_time=int(round(time.time() * 1000)) + current_time=int(round(time.time() * 1000)) - while(currentTime0): try: @@ -187,30 +333,34 @@ def events_read(): cntr_msg_requests_fetched += 1 cntr=cntr+1 msgs='['+msgs+']' - print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) + print(A1PMS_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) return Response(msgs, status=200, mimetype=MIME_JSON) except Exception as e: - print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + print(A1PMS_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) sleep(0.025) # sleep 25 milliseconds - currentTime=int(round(time.time() * 1000)) + current_time=int(round(time.time() * 1000)) - print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime)) + print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time)) return Response("[]", status=200, mimetype=MIME_JSON) -# Write messages stream. URI according to agent configuration. +# Write A1PMS messages stream. URI according to agent configuration. # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE # response: OK 200 or 400 for missing json parameters, 500 for other errors -@app.route(AGENT_WRITE_URL, +@app.route(A1PMS_WRITE_URL, methods=['PUT','POST']) def events_write(): global msg_responses global cntr_msg_responses_submitted + + if topic_write or topic_read: + return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) + with lock: - print("AGENT_WRITE_URL lock") + print("A1PMS_WRITE_URL lock") try: answer=request.json - print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) + print(A1PMS_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) if isinstance(answer, dict): #Create a an array if the answer is a dict (single message) answer_list=[] @@ -218,31 +368,147 @@ def events_write(): answer=answer_list for item in answer: - id=item['correlationId'] - if (id is None): - print(AGENT_WRITE_URL+" parameter 'correlatonid' missing") + cid=item['correlationId'] + if (cid is None): + print(A1PMS_WRITE_URL+" parameter 'correlatonid' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) msg=item['message'] if (msg is None): - print(AGENT_WRITE_URL+" parameter 'msgs' missing") + print(A1PMS_WRITE_URL+" parameter 'msgs' missing") return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT) status=item['status'] if (status is None): - print(AGENT_WRITE_URL+" parameter 'status' missing") + print(A1PMS_WRITE_URL+" parameter 'status' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) if isinstance(msg, list) or isinstance(msg, dict): msg_str=json.dumps(msg)+status[0:3] else: msg_str=msg+status[0:3] - msg_responses[id]=msg_str + msg_responses[cid]=msg_str cntr_msg_responses_submitted += 1 - print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str)) + print(A1PMS_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str)) + except Exception as e: + print(A1PMS_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) + + return Response('{}', status=200, mimetype=MIME_JSON) + +@app.route(ORU_WRITE_URL, + methods=['PUT','POST']) +def oru_write(): + global msg_requests + msg=json.dumps(request.json) + msg_requests.append(msg) + return Response('{}', status=200, mimetype=MIME_JSON) + +@app.route(ORU_READ_URL, + methods=['GET']) +def oru_read(): + global msg_requests + if(len(msg_requests)>0): + rsp=msg_requests.pop(0) + res=[] + res.append(rsp) + return Response(json.dumps(res), status=200, mimetype=MIME_JSON) + return Response("[]", status=200, mimetype=MIME_JSON) + +# Generic POST catching all urls starting with /events/. +# Writes the message in a que for that topic +@app.route("/events/", + methods=['POST']) +def generic_write(path): + global generic_messages + global cntr_msg_responses_submitted + urlkey="/events/"+str(path) + write_method=str(request.method) + with lock: + try: + if (urlkey.endswith(".text")): + payload=str(request.data.decode('UTF-8')) + print(write_method+" on "+urlkey+" text=" + payload) + else: + payload=request.json + print(write_method+" on "+urlkey+" json=" + json.dumps(payload)) + topicmsgs=[] + if (urlkey in generic_messages.keys()): + topicmsgs=generic_messages[urlkey] + else: + generic_messages[urlkey]=topicmsgs + + if isinstance(payload, list): + for listitem in payload: + topicmsgs.append(listitem) + else: + topicmsgs.append(payload) + + cntr_msg_responses_submitted += 1 except Exception as e: - print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) return Response('{}', status=200, mimetype=MIME_JSON) +# Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array. +# Returns only the messages previously written to the same urls +@app.route("/events/", + methods=['GET']) +def generic_read(path): + global generic_messages + global cntr_msg_requests_fetched + + if generic_topics_upload_baseurl: + return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) + + urlpath="/events/"+str(path) + urlkey="/events/"+str(path).split("/")[0] #Extract topic + print("GET on topic"+urlkey) + limit=request.args.get('limit') + if (limit is None): + limit=4096 + else: + limit=int(limit) + if (limit<0): + limit=0 + if (limit>4096): + limit=4096 + print("Limting number of returned messages to: "+str(limit)) + + timeout=request.args.get('timeout') + if (timeout is None): + timeout=10000 + else: + timeout=min(int(timeout),60000) + + start_time=int(round(time.time() * 1000)) + current_time=int(round(time.time() * 1000)) + topicmsgs=[] + if (urlkey in generic_messages.keys()): + topicmsgs=generic_messages[urlkey] + + while(current_time0): + try: + msgs='' + cntr=0 + while(cntr0): + if (len(msgs)>1): + msgs=msgs+',' + msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0))) + cntr_msg_requests_fetched += 1 + cntr=cntr+1 + msgs='['+msgs+']' + print("GET on "+urlpath+" MSGs: "+msgs) + return Response(msgs, status=200, mimetype=MIME_JSON) + except Exception as e: + print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + sleep(0.025) # sleep 25 milliseconds + current_time=int(round(time.time() * 1000)) + + print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time)) + return Response("[]", status=200, mimetype=MIME_JSON) + ### Functions for metrics read out ### @@ -297,5 +563,34 @@ def reset(): msg_responses={} return Response('OK', status=200, mimetype=MIME_TEXT) +# Get env vars, if present +if os.getenv("TOPIC_READ") is not None: + + print("Env variables:") + print("TOPIC_READ:"+os.environ['TOPIC_READ']) + print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE']) + + topic_read=os.environ['TOPIC_READ'] + topic_write=os.environ['TOPIC_WRITE'] + + + if topic_read and downloader_thread is None: + downloader_thread=Thread(target=dmaap_downloader) + downloader_thread.start() + + if topic_write and uploader_thread is None: + uploader_thread=Thread(target=dmaap_uploader) + uploader_thread.start() + +if 'GENERIC_TOPICS_UPLOAD_BASEURL' in os.environ: + print("GENERIC_TOPICS_UPLOAD_BASEURL:"+os.environ['GENERIC_TOPICS_UPLOAD_BASEURL']) + generic_topics_upload_baseurl=os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] + if generic_topics_upload_baseurl and generic_uploader_thread is None: + generic_uploader_thread=Thread(target=dmaap_generic_uploader) + generic_uploader_thread.start() + +if os.getenv("TOPIC_READ") is None or os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is None: + print("No env variables - OK") + if __name__ == "__main__": - app.run(port=HOST_PORT, host=HOST_IP) \ No newline at end of file + app.run(port=HOST_PORT, host=HOST_IP)