X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Fsrc%2FSTD_1.1.3%2Fa1.py;h=d7169b19e81b63e076ea82243f01958005dd17aa;hb=2d07867317ef49df4cd003899dcc7fe4c01b0352;hp=d234327a562bc180e248cc215d04d287527e16b7;hpb=7d31c11783b82cb058005242e3d2d0c932c9a04d;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/src/STD_1.1.3/a1.py b/near-rt-ric-simulator/src/STD_1.1.3/a1.py index d234327..d7169b1 100644 --- a/near-rt-ric-simulator/src/STD_1.1.3/a1.py +++ b/near-rt-ric-simulator/src/STD_1.1.3/a1.py @@ -26,11 +26,16 @@ from connexion import NoContent from flask import Flask, escape, request, Response, make_response from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set from utils import calcFingerprint -from maincommon import * +from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name + +#Constsants +APPL_JSON='application/json' +APPL_PROB_JSON='application/problem+json' # API Function: Get all policy ids def get_all_policy_identities(): + extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r @@ -48,25 +53,25 @@ def put_policy(policyId): try: data = request.data data = json.loads(data) - except: + except Exception: pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId) - return Response(json.dumps(pjson), 400, mimetype='application/problem+json') + return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) - fpPrevious=None + fp_previous=None retcode=201 if policyId in policy_instances.keys(): retcode=200 - fpPrevious=calcFingerprint(policy_instances[policyId]) + fp_previous=calcFingerprint(policy_instances[policyId]) fp=calcFingerprint(data) if (fp in policy_fingerprint.keys()): - id=policy_fingerprint[fp] - if (id != policyId): + p_id=policy_fingerprint[fp] + if (p_id != policyId): pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId) - return Response(json.dumps(pjson), 400, mimetype='application/problem+json') + return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) - if (fpPrevious is not None): - del policy_fingerprint[fpPrevious] + if (fp_previous is not None): + del policy_fingerprint[fp_previous] policy_fingerprint[fp]=policyId @@ -79,11 +84,11 @@ def put_policy(policyId): policy_status[policyId]=ps if (retcode == 200): - return Response(json.dumps(data), 200, mimetype='application/json') + return Response(json.dumps(data), 200, mimetype=APPL_JSON) else: headers={} headers['Location']='/A1-P/v1/policies/' + policyId - return Response(json.dumps(data), 201, headers=headers, mimetype='application/json') + return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON) # API Function: Get a policy def get_policy(policyId): @@ -94,10 +99,10 @@ def get_policy(policyId): return r if policyId in policy_instances.keys(): - return Response(json.dumps(policy_instances[policyId]), 200, mimetype='application/json') + return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON) pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId) - return Response(json.dumps(pjson), 404, mimetype='application/problem+json') + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) # API Function: Delete a policy def delete_policy(policyId): @@ -108,15 +113,15 @@ def delete_policy(policyId): return r if policyId in policy_instances.keys(): - fpPrevious=calcFingerprint(policy_instances[policyId]) - policy_fingerprint.pop(fpPrevious) + fp_previous=calcFingerprint(policy_instances[policyId]) + policy_fingerprint.pop(fp_previous) policy_instances.pop(policyId) policy_status.pop(policyId) callbacks.pop(policyId) - return Response('', 204, mimetype='application/json') + return Response('', 204, mimetype=APPL_JSON) pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId) - return Response(json.dumps(pjson), 404, mimetype='application/problem+json') + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) # API Function: Get status for a policy def get_policy_status(policyId): @@ -127,17 +132,17 @@ def get_policy_status(policyId): return r if policyId in policy_instances.keys(): - return Response(json.dumps(policy_status[policyId]), status=200, mimetype='application/json') + return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON) pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId) - return Response(json.dumps(pjson), 404, mimetype='application/problem+json') + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) # Helper: Create a response object if forced http response code is set def get_forced_response(): if (forced_settings['code'] is not None): pjson=create_error_response(forced_settings['code']) forced_settings['code']=None - return Response(json.dumps(pjson), pjson['status'], mimetype='application/problem+json') + return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON) return None # Helper: Delay if delayed response code is set @@ -146,9 +151,8 @@ def do_delay(): try: val=int(forced_settings['delay']) time.sleep(val) - except: + except Exception: return - return # Helper: Check if response shall be delayed or a forced response shall be sent def check_modified_response():