# ============LICENSE_START=============================================== # Copyright (C) 2021 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END================================================= # import os import copy import datetime import json import logging import collections import time import requests from connexion import NoContent from flask import Flask, request, Response from jsonschema import validate from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set from utils import calcFingerprint from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check from models.enforceStatus import EnforceStatus # Constants APPL_JSON='application/json' APPL_PROB_JSON='application/problem+json' EXT_SRV_URL=os.getenv('EXT_SRV_URL') KAFKA_DISPATCHER_URL=os.getenv('KAFKA_DISPATCHER_URL') # API Function: Get all policy type ids def get_all_policy_types(): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r res = list(policy_types.keys()) return Response(json.dumps(res), 200, mimetype=APPL_JSON) # API Function: Get a policy type def get_policy_type(policyTypeId): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r policy_type_id=str(policyTypeId) if (policy_type_id not in policy_types.keys()): pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_type_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON) # API Function: Get all policy ids def get_all_policy_identities(policyTypeId): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r policy_type_id=str(policyTypeId) if (policy_type_id not in policy_types.keys()): pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_type_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) res = list(policy_instances[policy_type_id].keys()) return Response(json.dumps(res), 200, mimetype=APPL_JSON) # API Function: Create or update a policy def put_policy(policyTypeId, policyId): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r policy_type_id=str(policyTypeId) policy_id=str(policyId) if (policy_type_id not in policy_types.keys()): pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) try: data = request.data data = json.loads(data) except Exception: pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policy_id) return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) try: validate(instance=data, schema=policy_types[policy_type_id]['policySchema']) except Exception: return (None, 400) fp_previous=None retcode=201 if policy_id in policy_instances[policy_type_id].keys(): retcode=200 if (is_duplicate_check()): fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id], policy_type_id) else: fp_previous=policy_id else: if (policy_id in policy_fingerprint.values()): pjson=create_problem_json(None, "The policy id already exist for other policy type.", 400, None, policy_id) return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) if (is_duplicate_check()): fp=calcFingerprint(data, policy_type_id) else: fp=policy_id if ((fp in policy_fingerprint.keys()) and is_duplicate_check()): p_id=policy_fingerprint[fp] if (p_id != policy_id): pjson=create_problem_json(None, "Duplicate, the policy json already exists.", 400, None, policy_id) return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON) #Callout hooks for kafka dispatcher if (KAFKA_DISPATCHER_URL is not None): resp = callout_kafka_dispatcher(policy_type_id, policy_id, data, retcode) if (resp != 200): pjson=create_error_response(resp) return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) # Callout hooks for external server # When it fails, break and return HTTP status code 500 if (EXT_SRV_URL is not None): resp = callout_external_server(policy_id, data, 'PUT') if (resp != retcode): pjson=create_error_response(resp) return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) if (fp_previous is not None): del policy_fingerprint[fp_previous] policy_fingerprint[fp]=policy_id noti = request.args.get('notificationDestination') callbacks[policy_id] = noti policy_instances[policy_type_id][policy_id]=data if (policy_types[policy_type_id]['statusSchema'] is not None): enforceStatus = EnforceStatus("NOT_ENFORCED", "OTHER_REASON") policy_status[policy_id] = enforceStatus.to_dict() if (retcode == 200): return Response(json.dumps(data), 200, mimetype=APPL_JSON) else: headers={} headers['Location']='/A1-P/v2/policytypes/' + policy_type_id + '/policies/' + policy_id return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON) # API Function: Get a policy def get_policy(policyTypeId, policyId): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r policy_type_id=str(policyTypeId) policy_id=str(policyId) if (policy_type_id not in policy_types.keys()): pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) if (policy_id not in policy_instances[policy_type_id].keys()): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) return Response(json.dumps(policy_instances[policy_type_id][policy_id]), 200, mimetype=APPL_JSON) # API Function: Delete a policy def delete_policy(policyTypeId, policyId): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r policy_type_id=str(policyTypeId) policy_id=str(policyId) if (policy_type_id not in policy_types.keys()): pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) if (policy_id not in policy_instances[policy_type_id].keys()): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) #Callout hooks for kafka dispatcher if (KAFKA_DISPATCHER_URL is not None): resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 204) if (resp != 200): pjson=create_error_response(resp) return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) # Callout hooks for external server # When it fails, break and return HTTP status code 500 if (EXT_SRV_URL is not None): resp = callout_external_server(policy_id, None, 'DELETE') if (resp != 204): pjson=create_error_response(resp) return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) if (is_duplicate_check()): fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id], policy_type_id) else: fp_previous=policy_id policy_fingerprint.pop(fp_previous) policy_instances[policy_type_id].pop(policy_id) policy_status.pop(policy_id) callbacks.pop(policy_id) return Response('', 204, mimetype=APPL_JSON) # API Function: Get status for a policy def get_policy_status(policyTypeId, policyId): extract_host_name(hosts_set, request) if ((r := check_modified_response()) is not None): return r policy_type_id=str(policyTypeId) policy_id=str(policyId) if (policy_type_id not in policy_types.keys()): pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) if (policy_id not in policy_instances[policy_type_id].keys()): pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id) return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON) #Callout hooks for kafka dispatcher if (KAFKA_DISPATCHER_URL is not None): resp = callout_kafka_dispatcher(policy_type_id, policy_id, None, 202) if (resp != 200): pjson=create_error_response(resp) return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON) return Response(json.dumps(policy_status[policy_id]), status=200, mimetype=APPL_JSON) # Helper: Callout kafka dispatcher server to notify it for policy operations def callout_kafka_dispatcher(policy_type_id, policy_id, payload, retcode): target_url = KAFKA_DISPATCHER_URL + "/policytypes/" + policy_type_id + "/kafkadispatcher/" + policy_id try: # create operation, publish with payload if (retcode == 201): resp=requests.put(target_url, json=payload, timeout=30, verify=False) return resp.status_code # update operation, publish with payload elif (retcode == 200): # add headers an update-flag headers = {'updateoper' : 'yes'} resp=requests.put(target_url, json=payload, headers=headers, timeout=30, verify=False) return resp.status_code # delete operation, publish without payload elif (retcode == 204): resp=requests.delete(target_url, timeout=30, verify=False) return resp.status_code # get policy status operation, publish without payload elif (retcode == 202): # update endpoint target_url = target_url + "/status" resp=requests.get(target_url, timeout=30, verify=False) return resp.status_code except Exception: return 419 # Helper: Callout external server to notify it for policy operations # Returns 200, 201 and 204 for the success callout hooks, for the others returns 419 def callout_external_server(policy_id, payload, operation): target_url=EXT_SRV_URL + policy_id try: if (operation == 'PUT'): #Suppress error when self-signed certificate is being used with verify flag resp=requests.put(target_url, json=payload, timeout=10, verify=False) return resp.status_code elif (operation == 'DELETE'): resp=requests.delete(target_url, timeout=10, verify=False) return resp.status_code except Exception: #Return a generic unassigned HTTP status code as per iana, for all exceptions (419:Callout failed) #https://www.iana.org/assignments/http-status-codes/http-status-codes.xhtml return 419 # Helper: Create a response object if forced http response code is set def get_forced_response(): if (forced_settings['code'] is not None): value=forced_settings['code'] pjson=create_error_response(int(value)) forced_settings['code']=None return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON) return None # Helper: Delay if delayed response code is set def do_delay(): if (forced_settings['delay'] is not None): try: val=int(forced_settings['delay']) time.sleep(val) except Exception: return # Helper: Check if response shall be delayed or a forced response shall be sent def check_modified_response(): do_delay() return get_forced_response() # Helper: Create a problem json object def create_problem_json(type_of, title, status, detail, instance): error = {} if type_of is not None: error["type"] = type_of if title is not None: error["title"] = title if status is not None: error["status"] = status if detail is not None: error["detail"] = detail if instance is not None: error["instance"] = instance return error # Helper: Create a problem json based on a generic http response code def create_error_response(code): if (code == 400): return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None)) elif (code == 404): return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None)) elif (code == 405): return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None)) elif (code == 409): return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None)) elif (code == 419): return(create_problem_json(None, "Callout failed", 419, "Callout hooks could not be processed on the external server", None)) elif (code == 429): return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None)) elif (code == 507): return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None)) elif (code == 503): return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None)) else: return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))