# ============LICENSE_START=============================================== # Copyright (C) 2020 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END================================================= # from flask import Flask, request from time import sleep import time import datetime import json from flask import Flask from flask import Response import traceback from threading import RLock app = Flask(__name__) lock = RLock() # list of messages to/from Dmaap msg_requests=[] msg_responses={} # Server info HOST_IP = "::" HOST_PORT = 2222 # Metrics vars cntr_msg_requests_submitted=0 cntr_msg_requests_fetched=0 cntr_msg_responses_submitted=0 cntr_msg_responses_fetched=0 # Request and response constants AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" APP_WRITE_URL="/send-request" APP_READ_URL="/receive-response" MIME_TEXT="text/plain" MIME_JSON="application/json" CAUGHT_EXCEPTION="Caught exception: " SERVER_ERROR="Server error :" #I'm alive function @app.route('/', methods=['GET']) def index(): return 'OK', 200 # Helper function to create a Dmaap request message # args : # response: json formatted string of a complete Dmaap message def create_message(operation, correlation_id, payload, url): if (payload is None): payload="{}" time_stamp=datetime.datetime.utcnow() msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",' msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}' return msg ### MR-stub interface, for MR control # Send a message to MR # URI and parameters (GET): /send-request?operation=&url= # response: (http 200) o4 400 for parameter error or 500 for other errors @app.route(APP_WRITE_URL, methods=['PUT','POST']) def sendrequest(): global msg_requests global cntr_msg_requests_submitted with lock: print("APP_WRITE_URL lock") try: oper=request.args.get('operation') if (oper is None): print(APP_WRITE_URL+" parameter 'operation' missing") return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT) url=request.args.get('url') if (url is None): print(APP_WRITE_URL+" parameter 'url' missing") return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT) if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"): print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE") return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT) print(APP_WRITE_URL+" operation="+oper+" url="+url) correlation_id=str(time.time_ns()) payload=None if (oper == "PUT") and (request.json is not None): payload=json.dumps(request.json) msg=create_message(oper, correlation_id, payload, url) print(msg) print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2)) msg_requests.append(msg) cntr_msg_requests_submitted += 1 return Response(correlation_id, status=200, mimetype=MIME_TEXT) except Exception as e: print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) # Receive a message response for MR for the included correlation id # URI and parameter, (GET): /receive-response?correlationid= # response: 200 or empty 204 or other errors 500 @app.route(APP_READ_URL, methods=['GET']) def receiveresponse(): global msg_responses global cntr_msg_responses_fetched with lock: print("APP_READ_URL lock") try: id=request.args.get('correlationid') if (id is None): print(APP_READ_URL+" parameter 'correclationid' missing") return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT) if (id in msg_responses): answer=msg_responses[id] del msg_responses[id] print(APP_READ_URL+" response (correlationid="+id+"): " + answer) cntr_msg_responses_fetched += 1 return Response(answer, status=200, mimetype=MIME_JSON) print(APP_READ_URL+" - no messages (correlationid="+id+"): ") return Response('', status=204, mimetype=MIME_JSON) except Exception as e: print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) ### Dmaap interface ### # Read messages stream. URI according to agent configuration. # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent # response: 200 , or 500 for other errors @app.route(AGENT_READ_URL, methods=['GET']) def events_read(): global msg_requests global cntr_msg_requests_fetched limit=request.args.get('limit') if (limit is None): limit=4096 else: limit=int(limit) if (limit<0): limit=0 if (limit>4096): limit=4096 print("Limting number of returned messages to: "+str(limit)) timeout=request.args.get('timeout') if (timeout is None): timeout=10000 else: timeout=min(int(timeout),60000) startTime=int(round(time.time() * 1000)) currentTime=int(round(time.time() * 1000)) while(currentTime0): try: msgs='' cntr=0 while(cntr0): if (len(msgs)>1): msgs=msgs+',' msgs=msgs+msg_requests.pop(0) cntr_msg_requests_fetched += 1 cntr=cntr+1 msgs='['+msgs+']' print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) return Response(msgs, status=200, mimetype=MIME_JSON) except Exception as e: print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) sleep(0.025) # sleep 25 milliseconds currentTime=int(round(time.time() * 1000)) print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime)) return Response("[]", status=200, mimetype=MIME_JSON) # Write messages stream. URI according to agent configuration. # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE # response: OK 200 or 400 for missing json parameters, 500 for other errors @app.route(AGENT_WRITE_URL, methods=['PUT','POST']) def events_write(): global msg_responses global cntr_msg_responses_submitted with lock: print("AGENT_WRITE_URL lock") try: answer=request.json print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) if isinstance(answer, dict): #Create a an array if the answer is a dict (single message) answer_list=[] answer_list.append(answer) answer=answer_list for item in answer: id=item['correlationId'] if (id is None): print(AGENT_WRITE_URL+" parameter 'correlatonid' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) msg=item['message'] if (msg is None): print(AGENT_WRITE_URL+" parameter 'msgs' missing") return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT) status=item['status'] if (status is None): print(AGENT_WRITE_URL+" parameter 'status' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) if isinstance(msg, list) or isinstance(msg, dict): msg_str=json.dumps(msg)+status[0:3] else: msg_str=msg+status[0:3] msg_responses[id]=msg_str cntr_msg_responses_submitted += 1 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str)) except Exception as e: print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) return Response('{}', status=200, mimetype=MIME_JSON) ### Functions for metrics read out ### @app.route('/counter/requests_submitted', methods=['GET']) def requests_submitted(): return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT) @app.route('/counter/requests_fetched', methods=['GET']) def requests_fetched(): return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT) @app.route('/counter/responses_submitted', methods=['GET']) def responses_submitted(): return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT) @app.route('/counter/responses_fetched', methods=['GET']) def responses_fetched(): return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT) @app.route('/counter/current_requests', methods=['GET']) def current_requests(): return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT) @app.route('/counter/current_responses', methods=['GET']) def current_responses(): return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT) ### Admin ### # Reset all messsages and counters @app.route('/reset', methods=['GET', 'POST', 'PUT']) def reset(): global cntr_msg_requests_submitted global cntr_msg_requests_fetched global cntr_msg_responses_submitted global cntr_msg_responses_fetched global msg_requests global msg_responses cntr_msg_requests_submitted=0 cntr_msg_requests_fetched=0 cntr_msg_responses_submitted=0 cntr_msg_responses_fetched=0 msg_requests=[] msg_responses={} return Response('OK', status=200, mimetype=MIME_TEXT) if __name__ == "__main__": app.run(port=HOST_PORT, host=HOST_IP)