X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=near-rt-ric-simulator%2Fsrc%2FSTD_2.0.0%2Fa1.py;h=e70a8edebd8291082a77ee0e5a6831189402957d;hb=514637b303ec7dc6390007be1a046189f7a9d169;hp=c4aa4ca6260d3648320b1b20bc7c712546ecaa6a;hpb=967079bda24e7a0f5268728f1474ce0ddc9e52d0;p=sim%2Fa1-interface.git diff --git a/near-rt-ric-simulator/src/STD_2.0.0/a1.py b/near-rt-ric-simulator/src/STD_2.0.0/a1.py old mode 100644 new mode 100755 index c4aa4ca..e70a8ed --- a/near-rt-ric-simulator/src/STD_2.0.0/a1.py +++ b/near-rt-ric-simulator/src/STD_2.0.0/a1.py @@ -1,5 +1,5 @@ # ============LICENSE_START=============================================== -# Copyright (C) 2020 Nordix Foundation. All rights reserved. +# Copyright (C) 2021 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,24 +15,29 @@ # ============LICENSE_END================================================= # +import os import copy import datetime import json import logging import collections import time +import requests from connexion import NoContent from flask import Flask, escape, request, Response, make_response from jsonschema import validate from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set from utils import calcFingerprint -from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name +from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check #Constsants APPL_JSON='application/json' APPL_PROB_JSON='application/problem+json' +EXT_SRV_URL=os.getenv('EXT_SRV_URL') +KAFKA_DISPATCHER_URL=os.getenv('KAFKA_DISPATCHER_URL') + # API Function: Get all policy type ids def get_all_policy_types(): @@ -107,18 +112,41 @@ def put_policy(policyTypeId, policyId): retcode=201 if policy_id in policy_instances[policy_type_id].keys(): retcode=200 - fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id]) + if (is_duplicate_check()): + fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id], policy_type_id) + else: + fp_previous=policy_id else: if (policy_id in policy_fingerprint.values()): - return (None, 400) + pjson=create_problem_json(None, "The policy id already exist for other policy type.", 400, None, policy_id) + return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) - fp=calcFingerprint(data) - if (fp in policy_fingerprint.keys()): + if (is_duplicate_check()): + fp=calcFingerprint(data, policy_type_id) + else: + fp=policy_id + + if ((fp in policy_fingerprint.keys()) and is_duplicate_check()): p_id=policy_fingerprint[fp] if (p_id != policy_id): pjson=create_problem_json(None, "Duplicate, the policy json already exists.", 400, None, policy_id) return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) + #Callout hooks for kafka dispatcher + if (KAFKA_DISPATCHER_URL is not None): + resp = callout_kafka_dispatcher(policy_type_id, policy_id, data, retcode) + if (resp != 200): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + + #Callout hooks for external server + #When it fails, break and return 419 HTTP status code + if (EXT_SRV_URL is not None): + resp = callout_external_server(policy_id, data, 'PUT') + if (resp != retcode): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + if (fp_previous is not None): del policy_fingerprint[fp_previous] @@ -163,7 +191,6 @@ def get_policy(policyTypeId, policyId): return Response(json.dumps(policy_instances[policy_type_id][policy_id]), 200, mimetype=APPL_JSON) - # API Function: Delete a policy def delete_policy(policyTypeId, policyId): @@ -183,14 +210,32 @@ def delete_policy(policyTypeId, policyId): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) - fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id]) + #Callout hooks for kafka dispatcher + if (KAFKA_DISPATCHER_URL is not None): + resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 204) + if (resp != 200): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + + #Callout hooks for external server + #When it fails, break and return 419 HTTP status code + if (EXT_SRV_URL is not None): + resp = callout_external_server(policy_id, None, 'DELETE') + if (resp != 204): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + + if (is_duplicate_check()): + fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id], policy_type_id) + else: + fp_previous=policy_id + policy_fingerprint.pop(fp_previous) policy_instances[policy_type_id].pop(policy_id) policy_status.pop(policy_id) callbacks.pop(policy_id) return Response('', 204, mimetype=APPL_JSON) - # API Function: Get status for a policy def get_policy_status(policyTypeId, policyId): @@ -210,18 +255,76 @@ def get_policy_status(policyTypeId, policyId): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) + #Callout hooks for kafka dispatcher + if (KAFKA_DISPATCHER_URL is not None): + resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 202) + if (resp != 200): + pjson=create_error_response(resp) + return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) + return Response(json.dumps(policy_status[policy_id]), status=200, mimetype=APPL_JSON) + +# Helper: Callout kafka dispatcher server to notify it for policy operations +def callout_kafka_dispatcher(policy_type_id, policy_id, payload, retcode): + + target_url = KAFKA_DISPATCHER_URL + "/policytypes/" + policy_type_id + "/kafkadispatcher/" + policy_id + try: + # create operation, publish with payload + if (retcode == 201): + resp=requests.put(target_url, json=payload, timeout=30, verify=False) + return resp.status_code + # update operation, publish with payload + elif (retcode == 200): + # add headers an update-flag + headers = {'updateoper' : 'yes'} + resp=requests.put(target_url, json=payload, headers=headers, timeout=30, verify=False) + return resp.status_code + # delete operation, publish without payload + elif (retcode == 204): + resp=requests.delete(target_url, timeout=30, verify=False) + return resp.status_code + # get policy status operation, publish without payload + elif (retcode == 202): + # update endpoint + target_url = target_url + "/status" + resp=requests.get(target_url, timeout=30, verify=False) + return resp.status_code + except Exception: + return 419 + + +# Helper: Callout external server to notify it for policy operations +# Returns 200, 201 and 204 for the success callout hooks, for the others returns 419 +def callout_external_server(policy_id, payload, operation): + + target_url=EXT_SRV_URL + policy_id + try: + if (operation == 'PUT'): + #Suppress error when self-signed certificate is being used with verify flag + resp=requests.put(target_url, json=payload, timeout=10, verify=False) + return resp.status_code + elif (operation == 'DELETE'): + resp=requests.delete(target_url, timeout=10, verify=False) + return resp.status_code + except Exception: + #Return a generic unassigned HTTP status code as per iana, for all exceptions (419:Callout failed) + #https://www.iana.org/assignments/http-status-codes/http-status-codes.xhtml + return 419 + # Helper: Create a response object if forced http response code is set def get_forced_response(): + if (forced_settings['code'] is not None): - pjson=create_error_response(forced_settings['code']) + value=forced_settings['code'] + pjson=create_error_response(int(value)) forced_settings['code']=None return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON) return None # Helper: Delay if delayed response code is set def do_delay(): + if (forced_settings['delay'] is not None): try: val=int(forced_settings['delay']) @@ -231,6 +334,7 @@ def do_delay(): # Helper: Check if response shall be delayed or a forced response shall be sent def check_modified_response(): + do_delay() return get_forced_response() @@ -253,19 +357,21 @@ def create_problem_json(type_of, title, status, detail, instance): # Helper: Create a problem json based on a generic http response code def create_error_response(code): - if code == '400': + if (code == 400): return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None)) - elif code == '404': + elif (code == 404): return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None)) - elif code == '405': + elif (code == 405): return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None)) - elif code == '409': + elif (code == 409): return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None)) - elif code == '429': + elif (code == 419): + return(create_problem_json(None, "Callout failed", 419, "Callout hooks could not be processed on the external server", None)) + elif (code == 429): return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None)) - elif code == '507': + elif (code == 507): return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None)) - elif code == '503': + elif (code == 503): return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None)) else: return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))