X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Fsrc%2FOSC_2.1.0%2Fa1.py;fp=near-rt-ric-simulator%2Fsrc%2FOSC_2.1.0%2Fa1.py;h=6db2599b9f3c175b474ebfed494459bdce140a89;hb=21be2e0c890fb9604d0add2942fcff19b6fe2fb2;hp=0000000000000000000000000000000000000000;hpb=f27a4460b38b62e665f3e9ee936f439270eded32;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/src/OSC_2.1.0/a1.py b/near-rt-ric-simulator/src/OSC_2.1.0/a1.py new file mode 100644 index 0000000..6db2599 --- /dev/null +++ b/near-rt-ric-simulator/src/OSC_2.1.0/a1.py @@ -0,0 +1,247 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2020 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +import json +import datetime +import time + +from datetime import datetime +from connexion import NoContent +from flask import Flask, request, Response +from jsonschema import validate +from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings +from utils import calcFingerprint +from maincommon import * + + +# API Function: Health check +def get_healthcheck(): + + if ((r := check_modified_response()) is not None): + return r + + return (None, 200) + +# API Function: Get all policy type ids +def get_all_policy_types(): + + if ((r := check_modified_response()) is not None): + return r + + return (list(policy_instances.keys()), 200) + +# API Function: Get a policy type +def get_policy_type(policy_type_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id not in policy_types.keys()): + return (None, 404) + + return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype='application/json') + +# API Function: Delete a policy type +def delete_policy_type(policy_type_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id not in policy_instances.keys()): + return (None, 404) + + if (len(policy_instances[policy_type_id]) > 0): + return (None, 400) + + del policy_instances[policy_type_id] + del policy_types[policy_type_id] + + return (None, 204) + + +# API Function: Create a policy type +def create_policy_type(policy_type_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id in policy_instances.keys()): + if (len(policy_instances[policy_type_id]) > 0): + return (None, 400) + + try: + data = request.data + data = json.loads(data) + except: + return (None, 400) + + if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())): + return (None, 400) + + if (policy_type_id not in policy_instances.keys()): + policy_instances[policy_type_id]={} + + policy_types[policy_type_id]=data + + return (None, 201) + + +# API Function: Get all policy ids for a type +def get_all_policy_identities(policy_type_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id not in policy_instances.keys()): + return (None, 404) + return (list(policy_instances[policy_type_id].keys()), 200) + +# API Function: Get a policy instance +def get_policy_instance(policy_type_id, policy_instance_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id not in policy_instances.keys()): + return (None, 404) + + if (policy_instance_id not in policy_instances[policy_type_id].keys()): + return (None, 404) + + return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype='application/json') + +# API function: Delete a policy +def delete_policy_instance(policy_type_id, policy_instance_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id not in policy_instances.keys()): + return (None, 404) + + if (policy_instance_id not in policy_instances[policy_type_id].keys()): + return (None, 404) + + fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id]) + del policy_fingerprint[fpPrevious] + del policy_instances[policy_type_id][policy_instance_id] + del policy_status[policy_instance_id] + + return (None, 202) + +# API function: Create/update a policy +def create_or_replace_policy_instance(policy_type_id, policy_instance_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + + if (policy_type_id not in policy_instances.keys()): + return (None, 404) + + try: + data = request.data + data = json.loads(data) + except: + return (None, 400) + + try: + validate(instance=data, schema=policy_types[policy_type_id]['create_schema']) + except: + return (None, 400) + + fpPrevious=None + retcode=201 + if policy_instance_id in policy_instances[policy_type_id].keys(): + retcode=200 + fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id]) + else: + if (policy_instance_id in policy_fingerprint.values()): + return (None, 400) + + fp=calcFingerprint(data) + if (fp in policy_fingerprint.keys()): + id=policy_fingerprint[fp] + if (id != policy_instance_id): + return (None, 400) + + if (fpPrevious is not None): + del policy_fingerprint[fpPrevious] + + policy_fingerprint[fp]=policy_instance_id + + policy_instances[policy_type_id][policy_instance_id]=data + ps={} + ps["instance_status"] = "NOT IN EFFECT" + ps["has_been_deleted"] = "false" + ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S")) + policy_status[policy_instance_id]=ps + + return (None, 202) + +# API function: Get policy status +def get_policy_instance_status(policy_type_id, policy_instance_id): + + if ((r := check_modified_response()) is not None): + return r + + policy_type_id=str(policy_type_id) + if (policy_type_id not in policy_instances.keys()): + return (None, 404) + + if (policy_instance_id not in policy_instances[policy_type_id].keys()): + return (None, 404) + + return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype='application/json') + +# Helper: Create a response object if forced http response code is set +def get_forced_response(): + + response_code=forced_settings['code'] + if (response_code is not None): + forced_settings['code'] = None + return (None, response_code) + return None + +# Helper: Delay if delayed response code is set +def do_delay(): + + if (forced_settings['delay'] is not None): + try: + val=int(forced_settings['delay']) + time.sleep(val) + except: + return + return + +# Helper: Check if response shall be delayed or a forced response shall be sent +def check_modified_response(): + do_delay() + return get_forced_response() \ No newline at end of file