X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fmrstub%2Fapp%2Fmain.py;h=09954286bb74dbed4bf697fc24af00a540c3c30c;hb=f4a7e420842aa8cde747aa71180f7b47bb1010dc;hp=4b1913f720e2c170eda162e38d6a7a636ca3d9a4;hpb=afe34e22e0be919b8e054826c23c551b4e493f79;p=nonrtric.git diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py index 4b1913f7..09954286 100644 --- a/test/mrstub/app/main.py +++ b/test/mrstub/app/main.py @@ -58,8 +58,8 @@ cntr_msg_responses_fetched=0 # Request and response constants ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT" ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/" -AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" -AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" +A1PMS_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" +A1PMS_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" APP_WRITE_URL="/send-request" APP_READ_URL="/receive-response" MIME_TEXT="text/plain" @@ -75,7 +75,7 @@ uploader_thread=None downloader_thread=None generic_uploader_thread=None -# Function to upload PMS messages to dmaap +# Function to upload A1PMS messages to dmaap def dmaap_uploader(): global msg_requests global cntr_msg_requests_fetched @@ -109,7 +109,7 @@ def dmaap_uploader(): sleep(0.01) -# Function to download PMS messages from dmaap +# Function to download A1PMS messages from dmaap def dmaap_downloader(): global msg_responses global cntr_msg_responses_submitted @@ -164,7 +164,8 @@ def dmaap_generic_uploader(): while True: if (len(generic_messages)): - for topicname in generic_messages.keys(): #topicname contains the path of the topics, eg. "/event/" + keys_copy = list(generic_messages.keys()) + for topicname in keys_copy: #topicname contains the path of the topics, eg. "/event/" topic_queue=generic_messages[topicname] if (len(topic_queue)>0): if (topicname.endswith(".text")): @@ -201,7 +202,7 @@ def index(): return 'OK', 200 -# Helper function to create a Dmaap PMS request message +# Helper function to create a Dmaap A1PMS request message # args : # response: json formatted string of a complete Dmaap message def create_message(operation, correlation_id, payload, url): @@ -215,7 +216,7 @@ def create_message(operation, correlation_id, payload, url): ### MR-stub interface, for MR control -# Send a PMS message to MR +# Send a A1PMS message to MR # URI and parameters (PUT or POST): /send-request?operation=&url= # response: (http 200) o4 400 for parameter error or 500 for other errors @app.route(APP_WRITE_URL, @@ -243,7 +244,7 @@ def sendrequest(): print(APP_WRITE_URL+" operation="+oper+" url="+url) correlation_id=str(time.time_ns()) payload=None - if (oper == "PUT") and (request.json is not None): + if (oper == "PUT") and len(request.data) > 0: payload=json.dumps(request.json) msg=create_message(oper, correlation_id, payload, url) @@ -256,7 +257,7 @@ def sendrequest(): print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) -# Receive a PMS message response for MR for the included correlation id +# Receive a A1PMS message response for MR for the included correlation id # URI and parameter, (GET): /receive-response?correlationid= # response: 200 or empty 204 or other errors 500 @app.route(APP_READ_URL, @@ -287,10 +288,10 @@ def receiveresponse(): ### Dmaap interface ### -# Read PMS messages stream. URI according to agent configuration. +# Read A1PMS messages stream. URI according to agent configuration. # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent # response: 200 , or 500 for other errors -@app.route(AGENT_READ_URL, +@app.route(A1PMS_READ_URL, methods=['GET']) def events_read(): global msg_requests @@ -332,10 +333,10 @@ def events_read(): cntr_msg_requests_fetched += 1 cntr=cntr+1 msgs='['+msgs+']' - print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) + print(A1PMS_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) return Response(msgs, status=200, mimetype=MIME_JSON) except Exception as e: - print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + print(A1PMS_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) sleep(0.025) # sleep 25 milliseconds current_time=int(round(time.time() * 1000)) @@ -343,10 +344,10 @@ def events_read(): print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time)) return Response("[]", status=200, mimetype=MIME_JSON) -# Write PMS messages stream. URI according to agent configuration. +# Write A1PMS messages stream. URI according to agent configuration. # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE # response: OK 200 or 400 for missing json parameters, 500 for other errors -@app.route(AGENT_WRITE_URL, +@app.route(A1PMS_WRITE_URL, methods=['PUT','POST']) def events_write(): global msg_responses @@ -356,10 +357,10 @@ def events_write(): return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) with lock: - print("AGENT_WRITE_URL lock") + print("A1PMS_WRITE_URL lock") try: answer=request.json - print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) + print(A1PMS_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) if isinstance(answer, dict): #Create a an array if the answer is a dict (single message) answer_list=[] @@ -369,15 +370,15 @@ def events_write(): for item in answer: cid=item['correlationId'] if (cid is None): - print(AGENT_WRITE_URL+" parameter 'correlatonid' missing") + print(A1PMS_WRITE_URL+" parameter 'correlatonid' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) msg=item['message'] if (msg is None): - print(AGENT_WRITE_URL+" parameter 'msgs' missing") + print(A1PMS_WRITE_URL+" parameter 'msgs' missing") return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT) status=item['status'] if (status is None): - print(AGENT_WRITE_URL+" parameter 'status' missing") + print(A1PMS_WRITE_URL+" parameter 'status' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) if isinstance(msg, list) or isinstance(msg, dict): msg_str=json.dumps(msg)+status[0:3] @@ -385,9 +386,9 @@ def events_write(): msg_str=msg+status[0:3] msg_responses[cid]=msg_str cntr_msg_responses_submitted += 1 - print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str)) + print(A1PMS_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str)) except Exception as e: - print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + print(A1PMS_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) return Response('{}', status=200, mimetype=MIME_JSON) @@ -581,7 +582,7 @@ if os.getenv("TOPIC_READ") is not None: uploader_thread=Thread(target=dmaap_uploader) uploader_thread.start() -if os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is not None: +if 'GENERIC_TOPICS_UPLOAD_BASEURL' in os.environ: print("GENERIC_TOPICS_UPLOAD_BASEURL:"+os.environ['GENERIC_TOPICS_UPLOAD_BASEURL']) generic_topics_upload_baseurl=os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] if generic_topics_upload_baseurl and generic_uploader_thread is None: