X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fmrstub%2Fapp%2Fmain.py;h=db13fc02c48cdbd89b30f8f9a4dc8bc1ab4d604c;hb=e0b665e3ff544bb78411bdc7f6b3ba2818fdeed4;hp=75b23f125f9510510c3bf3cfbc160cbadb8135e7;hpb=b83e54c2bdab21d7f6ff5c9a4c340e801c93e1c2;p=nonrtric.git diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py index 75b23f12..db13fc02 100644 --- a/test/mrstub/app/main.py +++ b/test/mrstub/app/main.py @@ -24,8 +24,10 @@ import json from flask import Flask from flask import Response import traceback -from threading import RLock +from threading import RLock, Thread import logging +import os +import requests # Disable all logging of GET on reading counters class AjaxFilter(logging.Filter): @@ -61,6 +63,89 @@ MIME_JSON="application/json" CAUGHT_EXCEPTION="Caught exception: " SERVER_ERROR="Server error :" +topic_write="" +topic_read="" + +uploader_thread=None +downloader_thread=None + +# Function to download messages from dmaap +def dmaap_uploader(): + global msg_requests + global cntr_msg_requests_fetched + + print("Starting uploader") + + headers = {'Content-type': 'application/json', 'Accept': '*/*'} + #url="http://"+topic_host+"/events/"+topic_read + url=topic_read + + while True: + while (len(msg_requests)>0): + msg=msg_requests[0] + if msg is not None: + try: + print("Sending to dmaap : "+ url) + print("Sending to dmaap : "+ msg) + resp=requests.post(url, data=msg, headers=headers, timeout=10) + if (resp.status_code<199 & resp.status_code > 299): + print("Failed, response code: " + str(resp.status_code)) + sleep(1) + else: + print("Dmaap response code: " + str(resp.status_code)) + print("Dmaap response text: " + str(resp.text)) + with lock: + msg_requests.pop(0) + cntr_msg_requests_fetched += 1 + except Exception as e: + print("Failed, exception: "+ str(e)) + sleep(1) + sleep(0.01) + + +# Function to download messages from dmaap +def dmaap_downloader(): + global msg_responses + global cntr_msg_responses_submitted + + print("Starting uploader") + + while True: + + try : + #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100" + url=topic_write + headers = {'Accept': 'application/json'} + print("Reading from dmaap: " + url) + resp=requests.get(url, headers=headers) + if (resp.status_code<199 & resp.status_code > 299): + print("Failed, response code: " + resp.status_code) + sleep(1) + else: + print("Recieved data from dmaap mr") + try: + data=resp.json() + print("Recieved data (raw): " + str(resp.text)) + if isinstance(data, list): + for item in data: + item=json.loads(item) + corrid=str(item["correlationId"]) + status=str(item["status"]) + msg=str(item["message"]) + item_str=msg+status[0:3] + with lock: + msg_responses[corrid]=item_str + cntr_msg_responses_submitted += 1 + else: + print("Data from dmaap is not json array: " + str(resp.text)) + sleep(1) + except Exception as e: + print("Corrupt data from dmaap mr - dropping " + str(data)) + print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc()) + sleep(1) + except Exception as e: + sleep(1) + #I'm alive function @app.route('/', methods=['GET']) @@ -134,19 +219,19 @@ def receiveresponse(): with lock: print("APP_READ_URL lock") try: - id=request.args.get('correlationid') - if (id is None): + cid=request.args.get('correlationid') + if (cid is None): print(APP_READ_URL+" parameter 'correclationid' missing") return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT) - if (id in msg_responses): - answer=msg_responses[id] - del msg_responses[id] - print(APP_READ_URL+" response (correlationid="+id+"): " + answer) + if (cid in msg_responses): + answer=msg_responses[cid] + del msg_responses[cid] + print(APP_READ_URL+" response (correlationid="+cid+"): " + answer) cntr_msg_responses_fetched += 1 return Response(answer, status=200, mimetype=MIME_JSON) - print(APP_READ_URL+" - no messages (correlationid="+id+"): ") + print(APP_READ_URL+" - no messages (correlationid="+cid+"): ") return Response('', status=204, mimetype=MIME_JSON) except Exception as e: print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) @@ -163,6 +248,9 @@ def events_read(): global msg_requests global cntr_msg_requests_fetched + if topic_write or topic_read: + return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) + limit=request.args.get('limit') if (limit is None): limit=4096 @@ -180,10 +268,10 @@ def events_read(): else: timeout=min(int(timeout),60000) - startTime=int(round(time.time() * 1000)) - currentTime=int(round(time.time() * 1000)) + start_time=int(round(time.time() * 1000)) + current_time=int(round(time.time() * 1000)) - while(currentTime0): try: @@ -202,9 +290,9 @@ def events_read(): print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT) sleep(0.025) # sleep 25 milliseconds - currentTime=int(round(time.time() * 1000)) + current_time=int(round(time.time() * 1000)) - print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime)) + print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time)) return Response("[]", status=200, mimetype=MIME_JSON) # Write messages stream. URI according to agent configuration. @@ -215,6 +303,10 @@ def events_read(): def events_write(): global msg_responses global cntr_msg_responses_submitted + + if topic_write or topic_read: + return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT) + with lock: print("AGENT_WRITE_URL lock") try: @@ -227,8 +319,8 @@ def events_write(): answer=answer_list for item in answer: - id=item['correlationId'] - if (id is None): + cid=item['correlationId'] + if (cid is None): print(AGENT_WRITE_URL+" parameter 'correlatonid' missing") return Response('Parameter missing in json', status=400, mimetype=MIME_TEXT) msg=item['message'] @@ -243,9 +335,9 @@ def events_write(): msg_str=json.dumps(msg)+status[0:3] else: msg_str=msg+status[0:3] - msg_responses[id]=msg_str + msg_responses[cid]=msg_str cntr_msg_responses_submitted += 1 - print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str)) + print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str)) except Exception as e: print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc()) return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON) @@ -306,5 +398,27 @@ def reset(): msg_responses={} return Response('OK', status=200, mimetype=MIME_TEXT) +# Get env vars, if present +if os.getenv("TOPIC_READ") is not None: + + print("Env variables:") + print("TOPIC_READ:"+os.environ['TOPIC_READ']) + print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE']) + + topic_read=os.environ['TOPIC_READ'] + topic_write=os.environ['TOPIC_WRITE'] + + + if topic_read and downloader_thread is None: + downloader_thread=Thread(target=dmaap_downloader) + downloader_thread.start() + + if topic_write and uploader_thread is None: + uploader_thread=Thread(target=dmaap_uploader) + uploader_thread.start() + +else: + print("No env variables - OK") + if __name__ == "__main__": app.run(port=HOST_PORT, host=HOST_IP) \ No newline at end of file