Adaptation of test env to helm chart
[nonrtric.git] / test / mrstub / app / main.py
index 40707d6..fb6d674 100644 (file)
@@ -43,6 +43,8 @@ lock = RLock()
 msg_requests=[]
 msg_responses={}
 
+generic_messages={}
+
 # Server info
 HOST_IP = "::"
 HOST_PORT = 2222
@@ -54,6 +56,8 @@ cntr_msg_responses_submitted=0
 cntr_msg_responses_fetched=0
 
 # Request and response constants
+ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT"
+ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/"
 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
 APP_WRITE_URL="/send-request"
@@ -103,7 +107,7 @@ def dmaap_uploader():
         sleep(0.01)
 
 
-# Function to download messages from dmaap
+# Function to upload messages to dmaap
 def dmaap_downloader():
     global msg_responses
     global cntr_msg_responses_submitted
@@ -344,6 +348,115 @@ def events_write():
 
         return Response('{}', status=200, mimetype=MIME_JSON)
 
+@app.route(ORU_WRITE_URL,
+    methods=['PUT','POST'])
+def oru_write():
+    global msg_requests
+    msg=json.dumps(request.json)
+    msg_requests.append(msg)
+    return Response('{}', status=200, mimetype=MIME_JSON)
+
+@app.route(ORU_READ_URL,
+    methods=['GET'])
+def oru_read():
+    global msg_requests
+    if(len(msg_requests)>0):
+        rsp=msg_requests.pop(0)
+        res=[]
+        res.append(rsp)
+        return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
+    return Response("[]", status=200, mimetype=MIME_JSON)
+
+# Generic POST/PUT catching all urls starting with /events/<topic>.
+# Writes the message in a que for that topic
+@app.route("/events/<path>",
+    methods=['PUT','POST'])
+def generic_write(path):
+    global generic_messages
+    global cntr_msg_responses_submitted
+    urlkey="/events/"+str(path)
+    write_method=str(request.method)
+    with lock:
+        try:
+            payload=request.json
+            print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
+            topicmsgs=[]
+            if (urlkey in generic_messages.keys()):
+                topicmsgs=generic_messages[urlkey]
+            else:
+                generic_messages[urlkey]=topicmsgs
+
+            if isinstance(payload, list):
+                for listitem in payload:
+                    topicmsgs.append(listitem)
+            else:
+                topicmsgs.append(payload)
+
+            cntr_msg_responses_submitted += 1
+        except Exception as e:
+            print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
+            return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
+
+        return Response('{}', status=200, mimetype=MIME_JSON)
+
+# Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array.
+# Returns only the messages previously written to the same urls
+@app.route("/events/<path:path>",
+    methods=['GET'])
+def generic_read(path):
+    global generic_messages
+    global cntr_msg_requests_fetched
+
+    urlpath="/events/"+str(path)
+    urlkey="/events/"+str(path).split("/")[0] #Extract topic
+    print("GET on topic"+urlkey)
+    limit=request.args.get('limit')
+    if (limit is None):
+        limit=4096
+    else:
+        limit=int(limit)
+    if (limit<0):
+        limit=0
+    if (limit>4096):
+        limit=4096
+    print("Limting number of returned messages to: "+str(limit))
+
+    timeout=request.args.get('timeout')
+    if (timeout is None):
+        timeout=10000
+    else:
+        timeout=min(int(timeout),60000)
+
+    start_time=int(round(time.time() * 1000))
+    current_time=int(round(time.time() * 1000))
+    topicmsgs=[]
+    if (urlkey in generic_messages.keys()):
+        topicmsgs=generic_messages[urlkey]
+
+    while(current_time<start_time+int(timeout)):
+        with lock:
+            if(len(topicmsgs)>0):
+                try:
+                    msgs=''
+                    cntr=0
+                    while(cntr<limit and len(topicmsgs)>0):
+                        if (len(msgs)>1):
+                            msgs=msgs+','
+                        msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0)))
+                        cntr_msg_requests_fetched += 1
+                        cntr=cntr+1
+                    msgs='['+msgs+']'
+                    print("GET on "+urlpath+" MSGs: "+msgs)
+                    return Response(msgs, status=200, mimetype=MIME_JSON)
+                except Exception as e:
+                    print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
+                    return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
+        sleep(0.025) # sleep 25 milliseconds
+        current_time=int(round(time.time() * 1000))
+
+    print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
+    return Response("[]", status=200, mimetype=MIME_JSON)
+
 
 ### Functions for metrics read out ###