X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fprodstub%2Fapp%2Fprodstub.py;h=ea8b914d09d780961d2ddd680665f81df1343aab;hb=f0a158c3c32b6547c839c559548e4e4b729802e1;hp=c21a7ab3df4d9d7a1964255e9aa6465a51382d65;hpb=b5cb68ea0e77d0a1421b4f17cc58b981628c29f7;p=nonrtric.git diff --git a/test/prodstub/app/prodstub.py b/test/prodstub/app/prodstub.py index c21a7ab3..ea8b914d 100644 --- a/test/prodstub/app/prodstub.py +++ b/test/prodstub/app/prodstub.py @@ -19,9 +19,24 @@ from flask import Flask from flask import request +import requests + import json from jsonschema import validate +import threading +import time +import datetime +import logging + +# Disable all logging of GET on reading counters and status +class AjaxFilter(logging.Filter): + def filter(self, record): + return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage()) + +log = logging.getLogger('werkzeug') +log.addFilter(AjaxFilter()) + app = Flask(__name__) # # list of callback messages @@ -31,20 +46,15 @@ app = Flask(__name__) HOST_IP = "::" HOST_PORT = 2222 -# # Metrics vars -# cntr_msg_callbacks=0 -# cntr_msg_fetched=0 - # Request and response constants -CALLBACK_CREATE_URL="/callbacks/create/" -CALLBACK_DELETE_URL="/callbacks/delete/" +CALLBACK_CREATE_URL="/callbacks/job/" +CALLBACK_DELETE_URL="/callbacks/job//" CALLBACK_SUPERVISION_URL="/callbacks/supervision/" ARM_CREATE_RESPONSE="/arm/create//" ARM_DELETE_RESPONSE="/arm/delete//" ARM_SUPERVISION_RESPONSE="/arm/supervision/" ARM_TYPE="/arm/type//" - COUNTER_SUPERVISION="/counter/supervision/" COUNTER_CREATE="/counter/create//" COUNTER_DELETE="/counter/delete//" @@ -62,6 +72,8 @@ PRODUCER_OR_JOB_NOT_FOUND="producer or job not found" PRODUCER_NOT_FOUND="producer not found" TYPE_NOT_FOUND="type not found" TYPE_IN_USE="type is in use in a job" +JOB_NOT_FOUND="job not found" +JOB_DATA_NOT_FOUND="job data not found" JSON_CORRUPT="json in request is corrupt or missing" #Producer and job db, including armed responses @@ -72,10 +84,17 @@ db={} # supervision counter # job # job json +# target_type # armed response for create # armed response for delete # create counter # delete counter +# delivering status + +# disable warning about unverified https requests +from requests.packages import urllib3 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Helper function to populate a callback dict with the basic structure # if job_id is None then only the producer level is setup and the producer dict is returned @@ -109,6 +128,8 @@ def setup_callback_dict(producer_id, job_id): job_dict['json']=None job_dict['create_counter']=0 job_dict['delete_counter']=0 + job_dict['delivering']="stopped" + job_dict['delivery_attempts']=0 return job_dict @@ -121,6 +142,9 @@ def get_callback_dict(producer_id, job_id): if (producer_id in db.keys()): producer_dict=db[producer_id] + if (producer_dict is None): + return None + if (job_id is None): return producer_dict @@ -141,6 +165,17 @@ def recursive_search(s_dict, s_key, s_id): return False +# Helper function to find all job dicts +def get_all_jobs(): + job_dicts={} + for producer_key in db: + producer_dict = db[producer_key] + for job_key in producer_dict: + job_dict = producer_dict[job_key] + if (isinstance(job_dict, dict)): + job_dicts[job_key]=job_dict + return job_dicts + # I'm alive function # response: always 200 @app.route('/', @@ -150,7 +185,7 @@ def index(): # Arm the create callback with a response code # Omitting the query parameter switch to response back to the standard 200/201 response -# URI and parameters (PUT): /arm/create//[?response=] +# URI and parameters (PUT): /arm/create//[?response=] # Setting # response: 200 (400 if incorrect query params) @app.route(ARM_CREATE_RESPONSE, @@ -166,6 +201,7 @@ def arm_create(producer_id, job_id): if (len(request.args) != 1): return UNKNOWN_QUERY_PARAMETERS,400 + print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response)) job_dict=setup_callback_dict(producer_id, job_id) @@ -176,7 +212,7 @@ def arm_create(producer_id, job_id): else: job_dict['create_response']=200 else: - job_dict['create_response']=arm_response + job_dict['create_response']=int(arm_response) return "",200 @@ -209,7 +245,7 @@ def arm_delete(producer_id, job_id): else: job_dict['delete_response']=204 else: - job_dict['delete_response']=arm_response + job_dict['delete_response']=int(arm_response) return "",200 @@ -236,7 +272,7 @@ def arm_supervision(producer_id): if (arm_response is None): producer_dict['supervision_response']=200 else: - producer_dict['supervision_response']=arm_response + producer_dict['supervision_response']=int(arm_response) return "",200 @@ -282,8 +318,9 @@ def disarm_type(producer_id, type_id): return "",200 + # Callback for create job -# URI and parameters (POST): /callbacks/create/ +# URI and parameters (POST): /callbacks/job/ # response 201 at create, 200 at update or other configured response code @app.route(CALLBACK_CREATE_URL, methods=['POST']) @@ -304,7 +341,7 @@ def callback_create(producer_id): type_list=producer_dict['types'] type_id=req_json_dict['ei_type_identity'] if (type_id not in type_list): - return TYPE_NOT_FOUND + return TYPE_NOT_FOUND, 400 job_id=req_json_dict['ei_job_identity'] job_dict=get_callback_dict(producer_id, job_id) @@ -317,55 +354,46 @@ def callback_create(producer_id): return_code=job_dict['create_response'] if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)): job_dict['json']=req_json_dict + job_dict['delivering']="delivering" if (job_dict['create_response'] == 201): #Set up next response code if create was ok job_dict['create_response'] = 200 if (job_dict['delete_response'] == 404): job_dict['delete_response'] = 204 else: + if(job_dict['delivering'] == "delivering"): + job_dict['delivering']="hold" return_msg=RETURNING_CONFIGURED_RESP job_dict['create_counter']=job_dict['create_counter']+1 else: return JOBID_NO_MATCH, 400 - return return_msg,return_code + return return_msg, return_code # Callback for delete job -# URI and parameters (POST): /callbacks/delete/ +# URI and parameters (DELETE): /callbacks/job// # response: 204 at delete or other configured response code @app.route(CALLBACK_DELETE_URL, - methods=['POST']) -def callback_delete(producer_id): + methods=['DELETE']) +def callback_delete(producer_id, job_id): - req_json_dict=None - try: - req_json_dict = json.loads(request.data) - with open('job-schema.json') as f: - schema = json.load(f) - validate(instance=req_json_dict, schema=schema) - except Exception: - return JSON_CORRUPT,400 - - job_id=req_json_dict['ei_job_identity'] job_dict=get_callback_dict(producer_id, job_id) if (job_dict is None): return PRODUCER_OR_JOB_NOT_FOUND,400 return_code=0 return_msg="" - if (req_json_dict['ei_job_identity'] == job_id): - print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id)) - return_code=job_dict['delete_response'] - if (job_dict['delete_response'] == 204): - job_dict['json']=None - job_dict['delete_response']=404 - if (job_dict['create_response'] == 200): - job_dict['create_response'] = 201 # reset create response if delete was ok - else: - return_msg=RETURNING_CONFIGURED_RESP - - job_dict['delete_counter']=job_dict['delete_counter']+1 + print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id)) + return_code=job_dict['delete_response'] + if (job_dict['delete_response'] == 204): + job_dict['json']=None + job_dict['delete_response']=404 + job_dict['delivering']="stopped" + if (job_dict['create_response'] == 200): + job_dict['create_response'] = 201 # reset create response if delete was ok else: - return JOBID_NO_MATCH, 400 + return_msg=RETURNING_CONFIGURED_RESP + + job_dict['delete_counter']=job_dict['delete_counter']+1 return return_msg, return_code @@ -390,7 +418,7 @@ def callback_supervision(producer_id): return return_msg,producer_dict['supervision_response'] -# Callback for supervision of producer +# Get the job definition for a job # URI and parameters (GET): "/jobdata//" # response: 200 or 204 @app.route(JOB_DATA, @@ -399,12 +427,68 @@ def get_jobdata(producer_id, job_id): print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id)) - job_dict=setup_callback_dict(producer_id, job_id) + job_dict=get_callback_dict(producer_id, job_id) + + if (job_dict is None): + return PRODUCER_OR_JOB_NOT_FOUND,400 + if (job_dict['json'] is None): return "",204 else: return json.dumps(job_dict['json']), 200 +# Delete the job definition for a job +# URI and parameters (DELETE): "/jobdata//" +# response: 204 +@app.route(JOB_DATA, + methods=['DELETE']) +def del_jobdata(producer_id, job_id): + + print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id)) + + job_dict=get_callback_dict(producer_id, job_id) + + if (job_dict is None): + return PRODUCER_OR_JOB_NOT_FOUND,400 + + job_dict['json']=None + + return "",204 + + +# Start data delivery for a job, action : START or STOP +# URI and parameters (POST): "/jobdata//?action=action" +# response: 200 or 204 +@app.route(JOB_DATA, + methods=['POST']) +def start_jobdata(producer_id, job_id): + + action=request.args.get('action') + + if (action is None): + return UNKNOWN_QUERY_PARAMETERS,400 + else: + if (len(request.args) != 1): + return UNKNOWN_QUERY_PARAMETERS,400 + else: + if ((action != "START") and (action != "STOP")): + return UNKNOWN_QUERY_PARAMETERS,400 + + print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action) + + job_dict=get_callback_dict(producer_id, job_id) + if (job_dict is None): + return JOB_NOT_FOUND,404 + + if (job_dict['json'] is None): + return JOB_DATA_NOT_FOUND, 400 + else: + if (action == "START"): + job_dict['delivering']="delivering" + else: + job_dict['delivering']="stopped" + return "",200 + # Counter for create calls for a job # URI and parameters (GET): "/counter/create//" @@ -414,7 +498,7 @@ def get_jobdata(producer_id, job_id): def counter_create(producer_id, job_id): job_dict=get_callback_dict(producer_id, job_id) if (job_dict is None): - return -1,200 + return "-1",200 return str(job_dict['create_counter']),200 # Counter for delete calls for a job @@ -425,7 +509,7 @@ def counter_create(producer_id, job_id): def counter_delete(producer_id, job_id): job_dict=get_callback_dict(producer_id, job_id) if (job_dict is None): - return -1,200 + return "-1",200 return str(job_dict['delete_counter']),200 # Counter for supervision calls for a producer @@ -436,7 +520,7 @@ def counter_delete(producer_id, job_id): def counter_supervision(producer_id): producer_dict=get_callback_dict(producer_id, None) if (producer_dict is None): - return -1,200 + return "-1",200 return str(producer_dict['supervision_counter']),200 # Get status info @@ -457,7 +541,36 @@ def reset(): db={} return "",200 + +def datadelivery() : + while True: + try: + job_dicts=get_all_jobs() + for key in job_dicts: + job=job_dicts[key] + if (job['delivering'] == "delivering" and job['json'] != None): + url=job['json']['target_uri'] + if (str(url).find("localhost:") == -1): #Dont deliver to localhost... + data={} + data["date"]=str(datetime.datetime.now()) + data["job"]=""+key + data["sequence_no"]=""+str(job['delivery_attempts']) + data["value"]=str(100) + print("Sending to "+url+" payload:"+json.dumps(data)) + + requests.post(url, json=data, verify=False, timeout=2) #NOSONAR + job['delivery_attempts'] += 1 + except Exception as err: + print("Error during data delivery: "+ str(err)) + time.sleep(1) + + ### Main function ### +print("Starting data delivery thread") +thread = threading.Thread(target=datadelivery, args=()) +thread.daemon = True +thread.start() + if __name__ == "__main__": app.run(port=HOST_PORT, host=HOST_IP)