# ============LICENSE_START=============================================== # Copyright (C) 2020 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END================================================= # from flask import Flask, request from time import sleep import time import datetime import json from flask import Flask from flask import Response import traceback from threading import RLock import logging # Disable all logging of GET on reading counters class AjaxFilter(logging.Filter): def filter(self, record): return ("/counter/" not in record.getMessage()) log = logging.getLogger('werkzeug') log.addFilter(AjaxFilter()) app = Flask(__name__) lock = RLock() # list of messages to/from Dmaap msg_requests=[] msg_responses={} # Server info HOST_IP = "::" HOST_PORT = 2222 # Metrics vars cntr_msg_requests_submitted=0 cntr_msg_requests_fetched=0 cntr_msg_responses_submitted=0 cntr_msg_responses_fetched=0 # Request and response constants AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE" AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent" APP_WRITE_URL="/send-request" APP_READ_URL="/receive-response" MIME_TEXT="text/plain" MIME_JSON="application/json" CAUGHT_EXCEPTION="Caught exception: " SERVER_ERROR="Server error :" #I'm alive function @app.route('/', methods=['GET']) def index(): return 'OK', 200 # Helper function to create a Dmaap request message # args : # response: json formatted string of a complete Dmaap message def create_message(operation, correlation_id, payload, url): if (payload is None): payload="{}" time_stamp=datetime.datetime.utcnow() msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",' msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}' return msg ### MR-stub interface, for MR control # Send a message to MR # URI and parameters (GET): /send-request?operation=&url= # response: (http 200) o4 400 for parameter error or 500 for other errors @app.route(APP_WRITE_URL, methods=['PUT','POST']) def sendrequest(): global msg_requests global cntr_msg_requests_submitted with lock: print("APP_WRITE_URL lock") try: oper=request.args.get('operation') if (oper is None): print(APP_WRITE_URL+" parameter 'operation' missing") return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT) url=request.args.get('url') if (url is None): print(APP_WRITE_URL+" parameter 'url' missing") return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT) if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"): print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE") return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT) print(APP_WRITE_URL+" operation="+oper+" url="+url) correlation_id=str(time.time_ns()) payload=None if (oper == "PUT") and (request.json is not None): payload=json.dumps(request.json) msg=create_message(oper, correlation_id, payload, url) print(msg) print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2)) msg_requests.append(msg) cntr_msg_requests_submitted += 1 return Response(correlation_id, status=200, mimetype=MIME_TEXT) except Exception as e: print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) # Receive a message response for MR for the included correlation id # URI and parameter, (GET): /receive-response?correlationid= # response: 200 or empty 204 or other errors 500 @app.route(APP_READ_URL, methods=['GET']) def receiveresponse(): global msg_responses global cntr_msg_responses_fetched with lock: print("APP_READ_URL lock") try: id=request.args.get('correlationid') if (id is None): print(APP_READ_URL+" parameter 'correclationid' missing") return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT) if (id in msg_responses): answer=msg_responses[id] del msg_responses[id] print(APP_READ_URL+" response (correlationid="+id+"): " + answer) cntr_msg_responses_fetched += 1 return Response(answer, status=200, mimetype=MIME_JSON) print(APP_READ_URL+" - no messages (correlationid="+id+"): ") return Response('', status=204, mimetype=MIME_JSON) except Exception as e: print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) ### Dmaap interface ### # Read messages stream. URI according to agent configuration. # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent # response: 200 , or 500 for other errors @app.route(AGENT_READ_URL, methods=['GET']) def events_read(): global msg_requests global cntr_msg_requests_fetched limit=request.args.get('limit') if (limit is None): limit=4096 else: limit=int(limit) if (limit<0): limit=0 if (limit>4096): limit=4096 print("Limting number of returned messages to: "+str(limit)) timeout=request.args.get('timeout') if (timeout is None): timeout=10000 else: timeout=min(int(timeout),60000) startTime=int(round(time.time() * 1000)) currentTime=int(round(time.time() * 1000)) while(currentTime0): try: msgs='' cntr=0 while(cntr0): if (len(msgs)>1): msgs=msgs+',' msgs=msgs+msg_requests.pop(0) cntr_msg_requests_fetched += 1 cntr=cntr+1 msgs='['+msgs+']' print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2)) return Response(msgs, status=200, mimetype=MIME_JSON) except Exception as e: print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) sleep(0.025) # sleep 25 milliseconds currentTime=int(round(time.time() * 1000)) print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime)) return Response("[]", status=200, mimetype=MIME_JSON) # Write messages stream. URI according to agent configuration. # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE # response: OK 200 or 400 for missing json parameters, 500 for other errors @app.route(AGENT_WRITE_URL, methods=['PUT','POST']) def events_write(): global msg_responses global cntr_msg_responses_submitted with lock: print("AGENT_WRITE_URL lock") try: answer=request.json print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2)) if isinstance(answer, dict): #Create a an array if the answer is a dict (single message) answer_list=[] answer_list.append(answer) answer=answer_list for item in answer: id=item['correlationId'] if (id is None): print(AGENT_WRITE_URL+" parameter 'correlatonid' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) msg=item['message'] if (msg is None): print(AGENT_WRITE_URL+" parameter 'msgs' missing") return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT) status=item['status'] if (status is None): print(AGENT_WRITE_URL+" parameter 'status' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) if isinstance(msg, list) or isinstance(msg, dict): msg_str=json.dumps(msg)+status[0:3] else: msg_str=msg+status[0:3] msg_responses[id]=msg_str cntr_msg_responses_submitted += 1 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str)) except Exception as e: print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) return Response('{}', status=200, mimetype=MIME_JSON) ### Functions for metrics read out ### @app.route('/counter/requests_submitted', methods=['GET']) def requests_submitted(): return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT) @app.route('/counter/requests_fetched', methods=['GET']) def requests_fetched(): return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT) @app.route('/counter/responses_submitted', methods=['GET']) def responses_submitted(): return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT) @app.route('/counter/responses_fetched', methods=['GET']) def responses_fetched(): return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT) @app.route('/counter/current_requests', methods=['GET']) def current_requests(): return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT) @app.route('/counter/current_responses', methods=['GET']) def current_responses(): return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT) ### Admin ### # Reset all messsages and counters @app.route('/reset', methods=['GET', 'POST', 'PUT']) def reset(): global cntr_msg_requests_submitted global cntr_msg_requests_fetched global cntr_msg_responses_submitted global cntr_msg_responses_fetched global msg_requests global msg_responses cntr_msg_requests_submitted=0 cntr_msg_requests_fetched=0 cntr_msg_responses_submitted=0 cntr_msg_responses_fetched=0 msg_requests=[] msg_responses={} return Response('OK', status=200, mimetype=MIME_TEXT) if __name__ == "__main__": app.run(port=HOST_PORT, host=HOST_IP)