X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fmrstub%2Fapp%2Fmain.py;h=75364354fb97201d8aafdc64c7d3d63b69c8adec;hb=c8f92e94bb2638aa66a1d7c2ba63375bbfcdc8c9;hp=fb6d67482be6f6aa0fa2b211352b28281ca4110e;hpb=968b89731a192c2ee3f3393d00519879ad89ce56;p=nonrtric.git diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py index fb6d6748..75364354 100644 --- a/test/mrstub/app/main.py +++ b/test/mrstub/app/main.py @@ -69,11 +69,13 @@ SERVER_ERROR="Server error :" topic_write="" topic_read="" +generic_topics_upload_baseurl="" uploader_thread=None downloader_thread=None +generic_uploader_thread=None -# Function to download messages from dmaap +# Function to upload PMS messages to dmaap def dmaap_uploader(): global msg_requests global cntr_msg_requests_fetched @@ -107,7 +109,7 @@ def dmaap_uploader(): sleep(0.01) -# Function to upload messages to dmaap +# Function to download PMS messages from dmaap def dmaap_downloader(): global msg_responses global cntr_msg_responses_submitted @@ -150,6 +152,49 @@ def dmaap_downloader(): except Exception as e: sleep(1) +# Function to upload generic messages to dmaap +def dmaap_generic_uploader(): + global msg_requests + global cntr_msg_requests_fetched + + print("Starting generic uploader") + + headers_json = {'Content-type': 'application/json', 'Accept': '*/*'} + headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'} + + while True: + if (len(generic_messages)): + keys_copy = list(generic_messages.keys()) + for topicname in keys_copy: #topicname contains the path of the topics, eg. "/event/" + topic_queue=generic_messages[topicname] + if (len(topic_queue)>0): + if (topicname.endswith(".text")): + msg=topic_queue[0] + headers=headers_text + else: + msg=topic_queue[0] + msg=json.dumps(msg) + headers=headers_json + url=generic_topics_upload_baseurl+topicname + print("Sending to dmaap : "+ url) + print("Sending to dmaap : "+ msg) + print("Sending to dmaap : "+ str(headers)) + try: + resp=requests.post(url, data=msg, headers=headers, timeout=10) + if (resp.status_code<199 & resp.status_code > 299): + print("Failed, response code: " + str(resp.status_code)) + sleep(1) + else: + print("Dmaap response code: " + str(resp.status_code)) + print("Dmaap response text: " + str(resp.text)) + with lock: + topic_queue.pop(0) + cntr_msg_requests_fetched += 1 + except Exception as e: + print("Failed, exception: "+ str(e)) + sleep(1) + sleep(0.01) + #I'm alive function @app.route('/', methods=['GET']) @@ -157,7 +202,7 @@ def index(): return 'OK', 200 -# Helper function to create a Dmaap request message +# Helper function to create a Dmaap PMS request message # args : # response: json formatted string of a complete Dmaap message def create_message(operation, correlation_id, payload, url): @@ -171,7 +216,7 @@ def create_message(operation, correlation_id, payload, url): ### MR-stub interface, for MR control -# Send a message to MR +# Send a PMS message to MR # URI and parameters (PUT or POST): /send-request?operation=&url= # response: (http 200) o4 400 for parameter error or 500 for other errors @app.route(APP_WRITE_URL, @@ -212,7 +257,7 @@ def sendrequest(): print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) -# Receive a message response for MR for the included correlation id +# Receive a PMS message response for MR for the included correlation id # URI and parameter, (GET): /receive-response?correlationid= # response: 200 or empty 204 or other errors 500 @app.route(APP_READ_URL, @@ -243,7 +288,7 @@ def receiveresponse(): ### Dmaap interface ### -# Read messages stream. URI according to agent configuration. +# Read PMS messages stream. URI according to agent configuration. # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent # response: 200 , or 500 for other errors @app.route(AGENT_READ_URL, @@ -299,7 +344,7 @@ def events_read(): print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time)) return Response("[]", status=200, mimetype=MIME_JSON) -# Write messages stream. URI according to agent configuration. +# Write PMS messages stream. URI according to agent configuration. # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE # response: OK 200 or 400 for missing json parameters, 500 for other errors @app.route(AGENT_WRITE_URL, @@ -367,10 +412,10 @@ def oru_read(): return Response(json.dumps(res), status=200, mimetype=MIME_JSON) return Response("[]", status=200, mimetype=MIME_JSON) -# Generic POST/PUT catching all urls starting with /events/. +# Generic POST catching all urls starting with /events/. # Writes the message in a que for that topic @app.route("/events/", - methods=['PUT','POST']) + methods=['POST']) def generic_write(path): global generic_messages global cntr_msg_responses_submitted @@ -378,8 +423,12 @@ def generic_write(path): write_method=str(request.method) with lock: try: - payload=request.json - print(write_method+" on "+urlkey+" json=" + json.dumps(payload)) + if (urlkey.endswith(".text")): + payload=str(request.data.decode('UTF-8')) + print(write_method+" on "+urlkey+" text=" + payload) + else: + payload=request.json + print(write_method+" on "+urlkey+" json=" + json.dumps(payload)) topicmsgs=[] if (urlkey in generic_messages.keys()): topicmsgs=generic_messages[urlkey] @@ -407,6 +456,9 @@ def generic_read(path): global generic_messages global cntr_msg_requests_fetched + if generic_topics_upload_baseurl: + return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) + urlpath="/events/"+str(path) urlkey="/events/"+str(path).split("/")[0] #Extract topic print("GET on topic"+urlkey) @@ -530,7 +582,14 @@ if os.getenv("TOPIC_READ") is not None: uploader_thread=Thread(target=dmaap_uploader) uploader_thread.start() -else: +if 'GENERIC_TOPICS_UPLOAD_BASEURL' in os.environ: + print("GENERIC_TOPICS_UPLOAD_BASEURL:"+os.environ['GENERIC_TOPICS_UPLOAD_BASEURL']) + generic_topics_upload_baseurl=os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] + if generic_topics_upload_baseurl and generic_uploader_thread is None: + generic_uploader_thread=Thread(target=dmaap_generic_uploader) + generic_uploader_thread.start() + +if os.getenv("TOPIC_READ") is None or os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is None: print("No env variables - OK") if __name__ == "__main__":