Merge "Update O-RU Fronthaul Recovery apex version"
[nonrtric.git] / test / cr / app / cr.py
index bc6e28f..2066e14 100644 (file)
 #  ============LICENSE_END=================================================
 #
 
-from flask import Flask, request
+from flask import Flask, request, Response
 from time import sleep
 import time
-import datetime
+from datetime import datetime
 import json
-from flask import Flask
-from flask import Response
 import traceback
+import logging
+import socket
+from threading import RLock
+from hashlib import md5
+
+# Disable all logging of GET on reading counters and db
+class AjaxFilter(logging.Filter):
+    def filter(self, record):
+        return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
+
+log = logging.getLogger('werkzeug')
+log.addFilter(AjaxFilter())
 
 app = Flask(__name__)
 
+lock = RLock()
+
 # list of callback messages
 msg_callbacks={}
 
@@ -36,16 +48,48 @@ HOST_PORT = 2222
 
 # Metrics vars
 cntr_msg_callbacks=0
+cntr_batch_callbacks=0
 cntr_msg_fetched=0
+cntr_callbacks={}
+hosts_set=set()
 
 # Request and response constants
 CALLBACK_URL="/callbacks/<string:id>"
+CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
+CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text
 APP_READ_URL="/get-event/<string:id>"
+APP_READ_ALL_URL="/get-all-events/<string:id>"
+DUMP_ALL_URL="/db"
+NULL_URL="/callbacks-null"  # Url for ignored callback. Callbacks are not checked, counted or stored
 
 MIME_TEXT="text/plain"
 MIME_JSON="application/json"
 CAUGHT_EXCEPTION="Caught exception: "
 SERVER_ERROR="Server error :"
+TIME_STAMP="cr-timestamp"
+
+forced_settings={}
+forced_settings['delay']=None
+
+
+# Remote host lookup and print host name
+def remote_host_logging(request):
+
+    if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
+        host_ip=str(request.environ['REMOTE_ADDR'])
+    else:
+        host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
+    prefix='::ffff:'
+    if (host_ip.startswith('::ffff:')):
+        host_ip=host_ip[len(prefix):]
+    try:
+        name, alias, addresslist = socket.gethostbyaddr(host_ip)
+        print("Calling host: "+str(name))
+        hosts_set.add(name)
+    except Exception:
+        print("Calling host not possible to retrieve IP: "+str(host_ip))
+        hosts_set.add(host_ip)
+
 
 #I'm alive function
 @app.route('/',
@@ -57,29 +101,70 @@ def index():
 
 # Fetch the oldest callback message for an id
 # URI and parameter, (GET): /get-event/<id>
-# response: message + 200 or 204
+# response: message + 200 or just 204 or just 500(error)
 @app.route(APP_READ_URL,
     methods=['GET'])
 def receiveresponse(id):
     global msg_callbacks
     global cntr_msg_fetched
 
-    try:
-        if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
-            cntr_msg_fetched+=1
-            msg=str(msg_callbacks[id][0])
-            print("Fetching msg for id: "+id+", msg="+msg)
-            del msg_callbacks[id][0]
-            return msg,200
-        print("No messages for id: "+id)
-    except Exception as e:
-        print(CAUGHT_EXCEPTION+str(e))
+    with lock:
+        try:
+            if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
+                cntr_msg_fetched+=1
+                cntr_callbacks[id][1]+=1
+                msg=msg_callbacks[id][0]
+                print("Fetching msg for id: "+id+", msg="+str(msg))
+
+                if (isinstance(msg,dict)):
+                    del msg[TIME_STAMP]
+                    if ("md5" in msg.keys()):
+                        print("EXTRACTED MD5")
+                        msg=msg["md5"]
+                        print("MD5: "+str(msg))
+
+                del msg_callbacks[id][0]
+                return json.dumps(msg),200
+            print("No messages for id: "+id)
+        except Exception as e:
+            print(CAUGHT_EXCEPTION+str(e))
+            traceback.print_exc()
+            return "",500
+
+        return "",204
+
+# Fetch all callback message for an id in an array
+# URI and parameter, (GET): /get-all-events/<id>
+# response: message + 200 or just 500(error)
+@app.route(APP_READ_ALL_URL,
+    methods=['GET'])
+def receiveresponse_all(id):
+    global msg_callbacks
+    global cntr_msg_fetched
 
-    return "",204
+    with lock:
+        try:
+            if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
+                cntr_msg_fetched+=len(msg_callbacks[id])
+                cntr_callbacks[id][1]+=len(msg_callbacks[id])
+                msg=msg_callbacks[id]
+                print("Fetching all msgs for id: "+id+", msg="+str(msg))
+                for sub_msg in msg:
+                    if (isinstance(sub_msg, dict)):
+                        del sub_msg[TIME_STAMP]
+                del msg_callbacks[id]
+                return json.dumps(msg),200
+            print("No messages for id: "+id)
+        except Exception as e:
+            print(CAUGHT_EXCEPTION+str(e))
+            traceback.print_exc()
+            return "",500
 
+        msg=[]
+        return json.dumps(msg),200
 
 # Receive a callback message
-# URI and payload, (PUT or POST): /callbacks/<id> <json array of response messages>
+# URI and payload, (PUT or POST): /callbacks/<id> <json messages>
 # response: OK 200 or 500 for other errors
 @app.route(CALLBACK_URL,
     methods=['PUT','POST'])
@@ -89,52 +174,299 @@ def events_write(id):
 
     try:
         print("Received callback for id: "+id +", content-type="+request.content_type)
+        remote_host_logging(request)
+        print("raw data: str(request.data): "+str(request.data))
+        do_delay()
         try:
             if (request.content_type == MIME_JSON):
-                msg = request.json
+                data = request.data
+                msg = json.loads(data)
                 print("Payload(json): "+str(msg))
-            elif (request.content_type == MIME_TEXT):
-                msg= request.form
-                print("Payload(text): "+str(msg))
             else:
-                msg="\"\""
-                print("Payload(content-type="+request.content_type+"). Setting data to empty, quoted, string")
-        except:
-            msg="\"\""
-            print("(Exception) Payload does not contain any json or text data, setting empty string as payload")
-
-        cntr_msg_callbacks += 1
-        if (id in msg_callbacks.keys()):
-            msg_callbacks[id].append(msg)
-        else:
-            msg_callbacks[id]=[]
-            msg_callbacks[id].append(msg)
+                msg={}
+                print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
+        except Exception as e:
+            msg={}
+            print("(Exception) Payload does not contain any json, setting empty json as payload")
+            traceback.print_exc()
+
+        with lock:
+            cntr_msg_callbacks += 1
+            if (isinstance(msg, dict)):
+                msg[TIME_STAMP]=str(datetime.now())
+            if (id in msg_callbacks.keys()):
+                msg_callbacks[id].append(msg)
+            else:
+                msg_callbacks[id]=[]
+                msg_callbacks[id].append(msg)
+
+            if (id in cntr_callbacks.keys()):
+                cntr_callbacks[id][0] += 1
+                cntr_callbacks[id][2] += 1
+            else:
+                cntr_callbacks[id]=[]
+                cntr_callbacks[id].append(1)
+                cntr_callbacks[id].append(0)
+                cntr_callbacks[id].append(0)
+
     except Exception as e:
         print(CAUGHT_EXCEPTION+str(e))
-        return 'OK',500
+        traceback.print_exc()
+        return 'NOTOK',500
 
     return 'OK',200
 
 
+# Receive a json callback message with payload formatted according to output from the message router
+# Array of stringified json objects
+# URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages>
+# json is a list of string encoded json items
+# response: OK 200 or 500 for other errors
+@app.route(CALLBACK_MR_URL,
+    methods=['PUT','POST'])
+def events_write_mr(id):
+    global msg_callbacks
+    global cntr_msg_callbacks
+    global cntr_batch_callbacks
+
+    storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
+                                        #Large payloads will otherwise overload the server
+    try:
+        print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
+        print("raw data: str(request.data): "+str(request.data))
+        if (storeas is None):
+            print("raw data: str(request.data): "+str(request.data))
+        do_delay()
+        list_data=False
+        try:
+            #if (request.content_type == MIME_JSON):
+            if (MIME_JSON in request.content_type):
+                data = request.data
+                msg_list = json.loads(data)
+                if (storeas is None):
+                    print("Payload(json): "+str(msg_list))
+                list_data=True
+            else:
+                msg_list=[]
+                print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
+        except Exception as e:
+            msg_list=[]
+            print("(Exception) Payload does not contain any json, setting empty json as payload")
+            traceback.print_exc()
+
+        with lock:
+            remote_host_logging(request)
+            if (list_data):
+                cntr_batch_callbacks += 1
+            for msg in msg_list:
+                if (storeas is None):
+                    msg=json.loads(msg)
+                else:
+                    #Convert to compact json without ws between parameter and value...
+                    #It seem that ws is added somewhere along to way to this server
+                    msg=json.loads(msg)
+                    msg=json.dumps(msg, separators=(',', ':'))
+
+                    md5msg={}
+                    md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
+                    msg=md5msg
+                    print("msg (json converted to md5 hash): "+str(msg["md5"]))
+                cntr_msg_callbacks += 1
+                if (isinstance(msg, dict)):
+                    msg[TIME_STAMP]=str(datetime.now())
+                if (id in msg_callbacks.keys()):
+                    msg_callbacks[id].append(msg)
+                else:
+                    msg_callbacks[id]=[]
+                    msg_callbacks[id].append(msg)
+
+                if (id in cntr_callbacks.keys()):
+                    cntr_callbacks[id][0] += 1
+                else:
+                    cntr_callbacks[id]=[]
+                    cntr_callbacks[id].append(1)
+                    cntr_callbacks[id].append(0)
+                    cntr_callbacks[id].append(0)
+            if (id in msg_callbacks.keys() and list_data):
+                cntr_callbacks[id][2] += 1
+
+    except Exception as e:
+        print(CAUGHT_EXCEPTION+str(e))
+        traceback.print_exc()
+        return 'NOTOK',500
+
+    return 'OK',200
+
+# Receive a callback message of a single text message (content type ignored)
+# or a json array of strings (content type json)
+# URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
+# response: OK 200 or 500 for other errors
+@app.route(CALLBACK_TEXT_URL,
+    methods=['PUT','POST'])
+def events_write_text(id):
+    global msg_callbacks
+    global cntr_msg_callbacks
+    global cntr_batch_callbacks
+
+    storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
+                                        #Large payloads will otherwise overload the server
+    try:
+        print("Received callback for id: "+id +", content-type="+request.content_type)
+        remote_host_logging(request)
+        if (storeas is None):
+            print("raw data: str(request.data): "+str(request.data))
+        do_delay()
+
+        try:
+            msg_list=None
+            list_data=False
+            if (MIME_JSON in request.content_type):  #Json array of strings
+                msg_list=json.loads(request.data)
+                list_data=True
+            else:
+                data=request.data.decode("utf-8")    #Assuming string
+                msg_list=[]
+                msg_list.append(data)
+            with lock:
+                cntr_batch_callbacks += 1
+                for msg in msg_list:
+                    if (storeas == "md5"):
+                        md5msg={}
+                        print("msg: "+str(msg))
+                        print("msg (endcode str): "+str(msg.encode('utf-8')))
+                        md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
+                        msg=md5msg
+                        print("msg (data converted to md5 hash): "+str(msg["md5"]))
+
+                    if (isinstance(msg, dict)):
+                        msg[TIME_STAMP]=str(datetime.now())
+
+                    cntr_msg_callbacks += 1
+                    if (id in msg_callbacks.keys()):
+                        msg_callbacks[id].append(msg)
+                    else:
+                        msg_callbacks[id]=[]
+                        msg_callbacks[id].append(msg)
+
+                    if (id in cntr_callbacks.keys()):
+                        cntr_callbacks[id][0] += 1
+                    else:
+                        cntr_callbacks[id]=[]
+                        cntr_callbacks[id].append(1)
+                        cntr_callbacks[id].append(0)
+                        cntr_callbacks[id].append(0)
+                if (id in cntr_callbacks.keys() and list_data):
+                    cntr_callbacks[id][2] += 1
+        except Exception as e:
+            print(CAUGHT_EXCEPTION+str(e))
+            traceback.print_exc()
+            return 'NOTOK',500
+
+
+    except Exception as e:
+        print(CAUGHT_EXCEPTION+str(e))
+        traceback.print_exc()
+        return 'NOTOK',500
+
+    return 'OK',200
+
+# Receive a callback message but ignore contents and return 200
+# URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
+# response: OK 200
+@app.route(NULL_URL,
+    methods=['PUT','POST'])
+def null_url(id):
+    return 'OK',200
+
+# Dump the whole db of current callbacks
+# URI and parameter, (GET): /db
+# response: message + 200
+@app.route(DUMP_ALL_URL,
+    methods=['GET'])
+def dump_db():
+    return json.dumps(msg_callbacks),200
+
 ### Functions for metrics read out ###
 
 @app.route('/counter/received_callbacks',
     methods=['GET'])
 def requests_submitted():
-    return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
+    req_id = request.args.get('id')
+    if (req_id is None):
+        return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
+
+    if (req_id in cntr_callbacks.keys()):
+        return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
+    else:
+        return Response(str("0"), status=200, mimetype=MIME_TEXT)
+
+@app.route('/counter/received_callback_batches',
+    methods=['GET'])
+def batches_submitted():
+    req_id = request.args.get('id')
+    if (req_id is None):
+        return Response(str(cntr_batch_callbacks), status=200, mimetype=MIME_TEXT)
+
+    if (req_id in cntr_callbacks.keys()):
+        return Response(str(cntr_callbacks[req_id][2]), status=200, mimetype=MIME_TEXT)
+    else:
+        return Response(str("0"), status=200, mimetype=MIME_TEXT)
 
 @app.route('/counter/fetched_callbacks',
     methods=['GET'])
 def requests_fetched():
-    return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
+    req_id = request.args.get('id')
+    if (req_id is None):
+        return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
+
+    if (req_id in cntr_callbacks.keys()):
+        return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
+    else:
+        return Response(str("0"), status=200, mimetype=MIME_TEXT)
 
 @app.route('/counter/current_messages',
     methods=['GET'])
 def current_messages():
-    return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
+    req_id = request.args.get('id')
+    if (req_id is None):
+        return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
 
+    if (req_id in cntr_callbacks.keys()):
+        return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
+    else:
+        return Response(str("0"), status=200, mimetype=MIME_TEXT)
+
+@app.route('/counter/remote_hosts',
+    methods=['GET'])
+def remote_hosts():
+    global hosts_set
 
+    hosts=",".join(hosts_set)
+    return Response(str(hosts), status=200, mimetype=MIME_TEXT)
 
+
+#Set force delay response, in seconds, for all callbacks
+#/froceesponse?delay=<seconds>
+@app.route('/forcedelay', methods=['POST'])
+def forcedelay():
+
+  try:
+    forced_settings['delay']=int(request.args.get('delay'))
+  except Exception:
+    forced_settings['delay']=None
+  return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
+
+# Helper: Delay if delayed response code is set
+def do_delay():
+  if (forced_settings['delay'] is not None):
+    try:
+      val=int(forced_settings['delay'])
+      if (val < 1):
+          return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
+      print("Delaying "+str(val)+ " sec.")
+      time.sleep(val)
+    except Exception:
+      return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
 ### Admin ###
 
 # Reset all messsages and counters
@@ -144,12 +476,19 @@ def reset():
     global msg_callbacks
     global cntr_msg_fetched
     global cntr_msg_callbacks
+    global cntr_batch_callbacks
+    global cntr_callbacks
+    global forced_settings
 
-    msg_callbacks={}
-    cntr_msg_fetched=0
-    cntr_msg_callbacks=0
+    with lock:
+        msg_callbacks={}
+        cntr_msg_fetched=0
+        cntr_msg_callbacks=0
+        cntr_batch_callbacks=0
+        cntr_callbacks={}
+        forced_settings['delay']=None
 
-    return Response('OK', status=200, mimetype=MIME_TEXT)
+        return Response('OK', status=200, mimetype=MIME_TEXT)
 
 ### Main function ###