X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Fmrstub%2Fmr.py;fp=near-rt-ric-simulator%2Fmrstub%2Fmr.py;h=45f8cfe58cef5544efa2d7bc059dd6ad75208f8f;hb=80a9200dc1dfc342d5debfaa5df3d28c14bc5a68;hp=0000000000000000000000000000000000000000;hpb=f217fffad3db72759708ea68892357b46b8ed4a0;p=nonrtric.git diff --git a/near-rt-ric-simulator/mrstub/mr.py b/near-rt-ric-simulator/mrstub/mr.py new file mode 100644 index 00000000..45f8cfe5 --- /dev/null +++ b/near-rt-ric-simulator/mrstub/mr.py @@ -0,0 +1,265 @@ + +# ============LICENSE_START=============================================== +# Copyright (C) 2020 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +from flask import Flask, request +from time import sleep +import time +import datetime +import json +from flask import Flask +from flask import Response +import traceback + +app = Flask(__name__) + +# list of messages to/from Dmaap +msg_requests={} +msg_responses={} + +# Server info +HOST_IP = "0.0.0.0" +HOST_PORT = 3905 + +# Metrics vars +cntr_msg_requests_submitted=0 +cntr_msg_requests_fetched=0 +cntr_msg_responses_submitted=0 +cntr_msg_responses_fetched=0 + +# Request and response constants +AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" +AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" +APP_WRITE_URL="/send-request" +APP_READ_URL="/receive-response" +MIME_TEXT="text/plain" +MIME_JSON="application/json" +CAUGHT_EXCEPTION="Caught exception: " +SERVER_ERROR="Server error :" + +#I'm alive function +@app.route('/', + methods=['GET']) +def index(): + return 'OK', 200 + + +# Helper function to create a Dmaap request message +# args : +# response: json formatted string of a complete Dmaap message +def create_message(operation, correlation_id, payload, url): + if (payload is None): + payload="{}" + time_stamp=datetime.datetime.utcnow() + msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",' + msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}' + return msg + + +### MR-stub interface, for MR control + +# Send a message to MR +# URI and parameters (GET): /send-request?operation=&url= +# response: (http 200) o4 400 for parameter error or 500 for other errors +@app.route(APP_WRITE_URL, + methods=['PUT','POST']) +def sendrequest(): + global msg_requests + global cntr_msg_requests_submitted + + try: + + oper=request.args.get('operation') + if (oper is None): + print(APP_WRITE_URL+" parameter 'operation' missing") + return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT) + + url=request.args.get('url') + if (url is None): + print(APP_WRITE_URL+" parameter 'url' missing") + return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT) + + if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"): + print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE") + return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT) + + print(APP_WRITE_URL+" operation="+oper+" url="+url) + correlation_id=str(time.time_ns()) + payload=None + if (oper == "PUT") and (request.json is not None): + payload=json.dumps(request.json) + + msg=create_message(oper, correlation_id, payload, url) + print(msg) + print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2)) + msg_requests[correlation_id]=msg + cntr_msg_requests_submitted += 1 + return Response(correlation_id, status=200, mimetype=MIME_TEXT) + except Exception as e: + print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + +# Receive a message response for MR for the included correlation id +# URI and parameter, (GET): /receive-response?correlationid= +# response: 200 or empty 204 or other errors 500 +@app.route(APP_READ_URL, + methods=['GET']) +def receiveresponse(): + global msg_responses + global cntr_msg_responses_fetched + + try: + id=request.args.get('correlationid') + if (id is None): + print(APP_READ_URL+" parameter 'correclationid' missing") + return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT) + + if (id in msg_responses): + answer=msg_responses[id] + del msg_responses[id] + print(APP_READ_URL+" response (correlationid="+id+"): " + answer) + cntr_msg_responses_fetched += 1 + return Response(answer, status=200, mimetype=MIME_JSON) + + print(APP_READ_URL+" - no messages (correlationid="+id+"): ") + return Response('', status=204, mimetype=MIME_JSON) + except Exception as e: + print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + +### Dmaap interface ### + +# Read messages stream. URI according to agent configuration. +# URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent +# response: 200 , or 500 for other errors +@app.route(AGENT_READ_URL, + methods=['GET']) +def events_read(): + global msg_requests + global cntr_msg_requests_fetched + try: + msgs='' + for item in msg_requests: + if (len(msgs)>1): + msgs=msgs+',' + msgs=msgs+msg_requests[item] + cntr_msg_requests_fetched += 1 + + msg_requests={} + msgs='['+msgs+']' + print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) + return Response(msgs, status=200, mimetype=MIME_JSON) + except Exception as e: + print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + +# Write messages stream. URI according to agent configuration. +# URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE +# response: OK 200 or 400 for missing json parameters, 500 for other errors +@app.route(AGENT_WRITE_URL, + methods=['PUT','POST']) +def events_write(): + global msg_responses + global cntr_msg_responses_submitted + + try: + answer=request.json + print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) + for item in answer: + id=item['correlationId'] + if (id is None): + print(AGENT_WRITE_URL+" parameter 'correlatonid' missing") + return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) + msg=item['message'] + if (msg is None): + print(AGENT_WRITE_URL+" parameter 'msgs' missing") + return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT) + status=item['status'] + if (status is None): + print(AGENT_WRITE_URL+" parameter 'status' missing") + return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) + if isinstance(msg, list) or isinstance(msg, dict): + msg_str=json.dumps(msg)+status[0:3] + else: + msg_str=msg+status[0:3] + msg_responses[id]=msg_str + cntr_msg_responses_submitted += 1 + print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str)) + except Exception as e: + print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) + return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) + + return Response('OK', status=200, mimetype=MIME_TEXT) + + +### Functions for metrics read out ### + +@app.route('/counter/requests_submitted', + methods=['GET']) +def requests_submitted(): + return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT) + +@app.route('/counter/requests_fetched', + methods=['GET']) +def requests_fetched(): + return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT) + +@app.route('/counter/responses_submitted', + methods=['GET']) +def responses_submitted(): + return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT) + +@app.route('/counter/responses_fetched', + methods=['GET']) +def responses_fetched(): + return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT) + +@app.route('/counter/current_requests', + methods=['GET']) +def current_requests(): + return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT) + +@app.route('/counter/current_responses', + methods=['GET']) +def current_responses(): + return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT) + +### Admin ### + +# Reset all messsages and counters +@app.route('/reset', + methods=['GET', 'POST', 'PUT']) +def reset(): + global cntr_msg_requests_submitted + global cntr_msg_requests_fetched + global cntr_msg_responses_submitted + global cntr_msg_responses_fetched + global msg_requests + global msg_responses + + cntr_msg_requests_submitted=0 + cntr_msg_requests_fetched=0 + cntr_msg_responses_submitted=0 + cntr_msg_responses_fetched=0 + msg_requests={} + msg_responses={} + return Response('OK', status=200, mimetype=MIME_TEXT) + +### Main function ### + +if __name__ == "__main__": + app.run(port=HOST_PORT, host=HOST_IP)