msg_requests=[]
msg_responses={}
+generic_messages={}
+
# Server info
HOST_IP = "::"
HOST_PORT = 2222
sleep(0.01)
-# Function to download messages from dmaap
+# Function to upload messages to dmaap
def dmaap_downloader():
global msg_responses
global cntr_msg_responses_submitted
return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
return Response("[]", status=200, mimetype=MIME_JSON)
+# Generic POST/PUT catching all urls starting with /events/<topic>.
+# Writes the message in a que for that topic
+@app.route("/events/<path>",
+ methods=['PUT','POST'])
+def generic_write(path):
+ global generic_messages
+ global cntr_msg_responses_submitted
+ urlkey="/events/"+str(path)
+ write_method=str(request.method)
+ with lock:
+ try:
+ payload=request.json
+ print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
+ topicmsgs=[]
+ if (urlkey in generic_messages.keys()):
+ topicmsgs=generic_messages[urlkey]
+ else:
+ generic_messages[urlkey]=topicmsgs
+
+ if isinstance(payload, list):
+ for listitem in payload:
+ topicmsgs.append(listitem)
+ else:
+ topicmsgs.append(payload)
+
+ cntr_msg_responses_submitted += 1
+ except Exception as e:
+ print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
+ return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
+
+ return Response('{}', status=200, mimetype=MIME_JSON)
+
+# Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array.
+# Returns only the messages previously written to the same urls
+@app.route("/events/<path:path>",
+ methods=['GET'])
+def generic_read(path):
+ global generic_messages
+ global cntr_msg_requests_fetched
+
+ urlpath="/events/"+str(path)
+ urlkey="/events/"+str(path).split("/")[0] #Extract topic
+ print("GET on topic"+urlkey)
+ limit=request.args.get('limit')
+ if (limit is None):
+ limit=4096
+ else:
+ limit=int(limit)
+ if (limit<0):
+ limit=0
+ if (limit>4096):
+ limit=4096
+ print("Limting number of returned messages to: "+str(limit))
+
+ timeout=request.args.get('timeout')
+ if (timeout is None):
+ timeout=10000
+ else:
+ timeout=min(int(timeout),60000)
+
+ start_time=int(round(time.time() * 1000))
+ current_time=int(round(time.time() * 1000))
+ topicmsgs=[]
+ if (urlkey in generic_messages.keys()):
+ topicmsgs=generic_messages[urlkey]
+
+ while(current_time<start_time+int(timeout)):
+ with lock:
+ if(len(topicmsgs)>0):
+ try:
+ msgs=''
+ cntr=0
+ while(cntr<limit and len(topicmsgs)>0):
+ if (len(msgs)>1):
+ msgs=msgs+','
+ msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0)))
+ cntr_msg_requests_fetched += 1
+ cntr=cntr+1
+ msgs='['+msgs+']'
+ print("GET on "+urlpath+" MSGs: "+msgs)
+ return Response(msgs, status=200, mimetype=MIME_JSON)
+ except Exception as e:
+ print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
+ return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
+ sleep(0.025) # sleep 25 milliseconds
+ current_time=int(round(time.time() * 1000))
+
+ print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
+ return Response("[]", status=200, mimetype=MIME_JSON)
+
+
### Functions for metrics read out ###
@app.route('/counter/requests_submitted',