X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Fsrc%2FSTD_1.1.3%2Fa1.py;h=ece4f3929b5d48c09d7a37fe83ddfa9a730b90d1;hb=d9b25321d9bce8a3db88102c127e2aa4436d81c8;hp=dfc81e6f58a2734feba42f07d273ca66c1653bc2;hpb=21be2e0c890fb9604d0add2942fcff19b6fe2fb2;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/src/STD_1.1.3/a1.py b/near-rt-ric-simulator/src/STD_1.1.3/a1.py index dfc81e6..ece4f39 100644 --- a/near-rt-ric-simulator/src/STD_1.1.3/a1.py +++ b/near-rt-ric-simulator/src/STD_1.1.3/a1.py @@ -1,5 +1,5 @@ # ============LICENSE_START=============================================== -# Copyright (C) 2020 Nordix Foundation. All rights reserved. +# Copyright (C) 2021 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,46 +23,63 @@ import collections import time from connexion import NoContent -from flask import Flask, escape, request, Response, make_response -from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint +from flask import Flask, request, Response +from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set from utils import calcFingerprint +from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check + +#Constsants +APPL_JSON='application/json' +APPL_PROB_JSON='application/problem+json' # API Function: Get all policy ids def get_all_policy_identities(): + extract_host_name(hosts_set, request) + if ((r := check_modified_response()) is not None): return r - return (list(policy_instances.keys()), 200) - + res = list(policy_instances.keys()) + return Response(json.dumps(res), 200, mimetype=APPL_JSON) + # API Function: Create or update a policy def put_policy(policyId): + extract_host_name(hosts_set, request) + if ((r := check_modified_response()) is not None): return r try: data = request.data data = json.loads(data) - except: + except Exception: pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId) - return Response(json.dumps(pjson), 400, mimetype='application/problem+json') + return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) - fpPrevious=None + fp_previous=None retcode=201 if policyId in policy_instances.keys(): retcode=200 - fpPrevious=calcFingerprint(policy_instances[policyId]) + if (is_duplicate_check()): + fp_previous=calcFingerprint(policy_instances[policyId]) + else: + fp_previous=policyId + + if (is_duplicate_check()): + fp=calcFingerprint(data) + else: + fp=policyId - fp=calcFingerprint(data) if (fp in policy_fingerprint.keys()): - id=policy_fingerprint[fp] - if (id != policyId): + p_id=policy_fingerprint[fp] + if (p_id != policyId): pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId) - return Response(json.dumps(pjson), 400, mimetype='application/problem+json') + return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) - if (fpPrevious is not None): - del policy_fingerprint[fpPrevious] + if (fp_previous is not None): + del policy_fingerprint[fp_previous] policy_fingerprint[fp]=policyId @@ -75,59 +92,69 @@ def put_policy(policyId): policy_status[policyId]=ps if (retcode == 200): - return Response(json.dumps(data), 200, mimetype='application/json') + return Response(json.dumps(data), 200, mimetype=APPL_JSON) else: headers={} headers['Location']='/A1-P/v1/policies/' + policyId - return Response(json.dumps(data), 201, headers=headers, mimetype='application/json') + return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON) # API Function: Get a policy def get_policy(policyId): + extract_host_name(hosts_set, request) + if ((r := check_modified_response()) is not None): return r if policyId in policy_instances.keys(): - return Response(json.dumps(policy_instances[policyId]), 200, mimetype='application/json') + return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON) pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId) - return Response(json.dumps(pjson), 404, mimetype='application/problem+json') + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) # API Function: Delete a policy def delete_policy(policyId): + extract_host_name(hosts_set, request) + if ((r := check_modified_response()) is not None): return r if policyId in policy_instances.keys(): - fpPrevious=calcFingerprint(policy_instances[policyId]) - policy_fingerprint.pop(fpPrevious) + if (is_duplicate_check()): + fp_previous=calcFingerprint(policy_instances[policyId]) + else: + fp_previous=policyId + + policy_fingerprint.pop(fp_previous) policy_instances.pop(policyId) policy_status.pop(policyId) callbacks.pop(policyId) - return Response('', 204, mimetype='application/json') + return Response('', 204, mimetype=APPL_JSON) pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId) - return Response(json.dumps(pjson), 404, mimetype='application/problem+json') + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) # API Function: Get status for a policy def get_policy_status(policyId): + extract_host_name(hosts_set, request) + if ((r := check_modified_response()) is not None): return r if policyId in policy_instances.keys(): - return Response(json.dumps(policy_status[policyId]), status=200, mimetype='application/json') + return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON) pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId) - return Response(json.dumps(pjson), 404, mimetype='application/problem+json') + return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) # Helper: Create a response object if forced http response code is set def get_forced_response(): if (forced_settings['code'] is not None): pjson=create_error_response(forced_settings['code']) forced_settings['code']=None - return Response(json.dumps(pjson), pjson['status'], mimetype='application/problem+json') + return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON) return None # Helper: Delay if delayed response code is set @@ -136,9 +163,8 @@ def do_delay(): try: val=int(forced_settings['delay']) time.sleep(val) - except: + except Exception: return - return # Helper: Check if response shall be delayed or a forced response shall be sent def check_modified_response():