X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fprodstub%2Fapp%2Fprodstub.py;fp=test%2Fprodstub%2Fapp%2Fprodstub.py;h=df4898dd1977e784deb52a01918374d0d9b6b99b;hb=39ad50eaea46a2fadb4f85ffda02cc90acd0d8d5;hp=c21a7ab3df4d9d7a1964255e9aa6465a51382d65;hpb=f38e1e8cb3652e73ea1e76b5a1a77a4aeb108577;p=nonrtric.git diff --git a/test/prodstub/app/prodstub.py b/test/prodstub/app/prodstub.py index c21a7ab3..df4898dd 100644 --- a/test/prodstub/app/prodstub.py +++ b/test/prodstub/app/prodstub.py @@ -19,9 +19,15 @@ from flask import Flask from flask import request +import requests + import json from jsonschema import validate +import threading +import time +import datetime + app = Flask(__name__) # # list of callback messages @@ -44,7 +50,6 @@ ARM_CREATE_RESPONSE="/arm/create//" ARM_DELETE_RESPONSE="/arm/delete//" ARM_SUPERVISION_RESPONSE="/arm/supervision/" ARM_TYPE="/arm/type//" - COUNTER_SUPERVISION="/counter/supervision/" COUNTER_CREATE="/counter/create//" COUNTER_DELETE="/counter/delete//" @@ -62,6 +67,8 @@ PRODUCER_OR_JOB_NOT_FOUND="producer or job not found" PRODUCER_NOT_FOUND="producer not found" TYPE_NOT_FOUND="type not found" TYPE_IN_USE="type is in use in a job" +JOB_NOT_FOUND="job not found" +JOB_DATA_NOT_FOUND="job data not found" JSON_CORRUPT="json in request is corrupt or missing" #Producer and job db, including armed responses @@ -72,6 +79,7 @@ db={} # supervision counter # job # job json +# target_type # armed response for create # armed response for delete # create counter @@ -109,6 +117,8 @@ def setup_callback_dict(producer_id, job_id): job_dict['json']=None job_dict['create_counter']=0 job_dict['delete_counter']=0 + job_dict['delivering']=False + job_dict['delivery_attempts']=0 return job_dict @@ -121,6 +131,9 @@ def get_callback_dict(producer_id, job_id): if (producer_id in db.keys()): producer_dict=db[producer_id] + if (producer_dict is None): + return None + if (job_id is None): return producer_dict @@ -141,6 +154,17 @@ def recursive_search(s_dict, s_key, s_id): return False +# Helper function to find all job dicts +def get_all_jobs(): + job_dicts={} + for producer_key in db: + producer_dict = db[producer_key] + for job_key in producer_dict: + job_dict = producer_dict[job_key] + if (isinstance(job_dict, dict)): + job_dicts[job_key]=job_dict + return job_dicts + # I'm alive function # response: always 200 @app.route('/', @@ -150,7 +174,7 @@ def index(): # Arm the create callback with a response code # Omitting the query parameter switch to response back to the standard 200/201 response -# URI and parameters (PUT): /arm/create//[?response=] +# URI and parameters (PUT): /arm/create//[?response=] # Setting # response: 200 (400 if incorrect query params) @app.route(ARM_CREATE_RESPONSE, @@ -166,6 +190,7 @@ def arm_create(producer_id, job_id): if (len(request.args) != 1): return UNKNOWN_QUERY_PARAMETERS,400 + print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response)) job_dict=setup_callback_dict(producer_id, job_id) @@ -176,7 +201,7 @@ def arm_create(producer_id, job_id): else: job_dict['create_response']=200 else: - job_dict['create_response']=arm_response + job_dict['create_response']=int(arm_response) return "",200 @@ -209,7 +234,7 @@ def arm_delete(producer_id, job_id): else: job_dict['delete_response']=204 else: - job_dict['delete_response']=arm_response + job_dict['delete_response']=int(arm_response) return "",200 @@ -236,7 +261,7 @@ def arm_supervision(producer_id): if (arm_response is None): producer_dict['supervision_response']=200 else: - producer_dict['supervision_response']=arm_response + producer_dict['supervision_response']=int(arm_response) return "",200 @@ -282,6 +307,7 @@ def disarm_type(producer_id, type_id): return "",200 + # Callback for create job # URI and parameters (POST): /callbacks/create/ # response 201 at create, 200 at update or other configured response code @@ -304,7 +330,7 @@ def callback_create(producer_id): type_list=producer_dict['types'] type_id=req_json_dict['ei_type_identity'] if (type_id not in type_list): - return TYPE_NOT_FOUND + return TYPE_NOT_FOUND, 400 job_id=req_json_dict['ei_job_identity'] job_dict=get_callback_dict(producer_id, job_id) @@ -317,6 +343,7 @@ def callback_create(producer_id): return_code=job_dict['create_response'] if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)): job_dict['json']=req_json_dict + job_dict['delivering']=True if (job_dict['create_response'] == 201): #Set up next response code if create was ok job_dict['create_response'] = 200 if (job_dict['delete_response'] == 404): @@ -328,7 +355,7 @@ def callback_create(producer_id): else: return JOBID_NO_MATCH, 400 - return return_msg,return_code + return return_msg, return_code # Callback for delete job # URI and parameters (POST): /callbacks/delete/ @@ -358,6 +385,7 @@ def callback_delete(producer_id): if (job_dict['delete_response'] == 204): job_dict['json']=None job_dict['delete_response']=404 + job_dict['delivering']=False if (job_dict['create_response'] == 200): job_dict['create_response'] = 201 # reset create response if delete was ok else: @@ -390,7 +418,7 @@ def callback_supervision(producer_id): return return_msg,producer_dict['supervision_response'] -# Callback for supervision of producer +# Get the job definition for a job # URI and parameters (GET): "/jobdata//" # response: 200 or 204 @app.route(JOB_DATA, @@ -399,12 +427,49 @@ def get_jobdata(producer_id, job_id): print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id)) - job_dict=setup_callback_dict(producer_id, job_id) + job_dict=get_callback_dict(producer_id, job_id) + + if (job_dict is None): + return PRODUCER_OR_JOB_NOT_FOUND,400 + if (job_dict['json'] is None): return "",204 else: return json.dumps(job_dict['json']), 200 +# Start data delivery for a job, action : START or STOP +# URI and parameters (POST): "/jobdata//?action=action" +# response: 200 or 204 +@app.route(JOB_DATA, + methods=['POST']) +def start_jobdata(producer_id, job_id): + + action=request.args.get('action') + + if (action is None): + return UNKNOWN_QUERY_PARAMETERS,400 + else: + if (len(request.args) != 1): + return UNKNOWN_QUERY_PARAMETERS,400 + else: + if ((action != "START") and (action != "STOP")): + return UNKNOWN_QUERY_PARAMETERS,400 + + print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action) + + job_dict=get_callback_dict(producer_id, job_id) + if (job_dict is None): + return JOB_NOT_FOUND,404 + + if (job_dict['json'] is None): + return JOB_DATA_NOT_FOUND, 400 + else: + if (action == "START"): + job_dict['delivering']=True + else: + job_dict['delivering']=False + return "",200 + # Counter for create calls for a job # URI and parameters (GET): "/counter/create//" @@ -414,7 +479,7 @@ def get_jobdata(producer_id, job_id): def counter_create(producer_id, job_id): job_dict=get_callback_dict(producer_id, job_id) if (job_dict is None): - return -1,200 + return "-1",200 return str(job_dict['create_counter']),200 # Counter for delete calls for a job @@ -425,7 +490,7 @@ def counter_create(producer_id, job_id): def counter_delete(producer_id, job_id): job_dict=get_callback_dict(producer_id, job_id) if (job_dict is None): - return -1,200 + return "-1",200 return str(job_dict['delete_counter']),200 # Counter for supervision calls for a producer @@ -436,7 +501,7 @@ def counter_delete(producer_id, job_id): def counter_supervision(producer_id): producer_dict=get_callback_dict(producer_id, None) if (producer_dict is None): - return -1,200 + return "-1",200 return str(producer_dict['supervision_counter']),200 # Get status info @@ -457,7 +522,36 @@ def reset(): db={} return "",200 + +def datadelivery() : + while True: + try: + job_dicts=get_all_jobs() + for key in job_dicts: + job=job_dicts[key] + if (job['delivering'] == True and job['json'] != None): + url=job['json']['target_uri'] + + data={} + data["date"]=str(datetime.datetime.now()) + data["job"]=""+key + data["sequence_no"]=""+str(job['delivery_attempts']) + data["value"]=str(100) + print("Sending "+json.dumps(data)) + + requests.post(url, json=data, verify=False, timeout=2) #NOSONAR + job['delivery_attempts'] += 1 + except Exception as err: + print("Error during data delivery: "+ str(err)) + time.sleep(1) + + ### Main function ### +print("Starting data delivery thread") +thread = threading.Thread(target=datadelivery, args=()) +thread.daemon = True +thread.start() + if __name__ == "__main__": app.run(port=HOST_PORT, host=HOST_IP)