1 # ============LICENSE_START===============================================
2 # Copyright (C) 2021 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
25 from connexion import NoContent
26 from flask import Flask, escape, request, Response, make_response
27 from jsonschema import validate
28 from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
29 from utils import calcFingerprint
30 from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
33 APPL_JSON='application/json'
34 APPL_PROB_JSON='application/problem+json'
36 # API Function: Get all policy type ids
37 def get_all_policy_types():
39 extract_host_name(hosts_set, request)
41 if ((r := check_modified_response()) is not None):
44 res = list(policy_types.keys())
47 # API Function: Get a policy type
48 def get_policy_type(policyTypeId):
50 extract_host_name(hosts_set, request)
52 if ((r := check_modified_response()) is not None):
55 policy_type_id=str(policyTypeId)
57 if (policy_type_id not in policy_types.keys()):
58 pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_type_id)
59 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
61 return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON)
63 # API Function: Get all policy ids
64 def get_all_policy_identities(policyTypeId):
66 extract_host_name(hosts_set, request)
68 if ((r := check_modified_response()) is not None):
71 policy_type_id=str(policyTypeId)
73 if (policy_type_id not in policy_types.keys()):
74 pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_type_id)
75 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
77 return (list(policy_instances[policy_type_id].keys()), 200)
79 # API Function: Create or update a policy
80 def put_policy(policyTypeId, policyId):
82 extract_host_name(hosts_set, request)
84 if ((r := check_modified_response()) is not None):
87 policy_type_id=str(policyTypeId)
88 policy_id=str(policyId)
90 if (policy_type_id not in policy_types.keys()):
91 pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id)
92 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
96 data = json.loads(data)
98 pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policy_id)
99 return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
102 validate(instance=data, schema=policy_types[policy_type_id]['policySchema'])
108 if policy_id in policy_instances[policy_type_id].keys():
110 if (is_duplicate_check()):
111 fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id], policy_type_id)
113 fp_previous=policy_id
115 if (policy_id in policy_fingerprint.values()):
116 pjson=create_problem_json(None, "The policy id already exist for other policy type.", 400, None, policy_id)
117 return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
119 if (is_duplicate_check()):
120 fp=calcFingerprint(data, policy_type_id)
124 if ((fp in policy_fingerprint.keys()) and is_duplicate_check()):
125 p_id=policy_fingerprint[fp]
126 if (p_id != policy_id):
127 pjson=create_problem_json(None, "Duplicate, the policy json already exists.", 400, None, policy_id)
128 return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
130 if (fp_previous is not None):
131 del policy_fingerprint[fp_previous]
133 policy_fingerprint[fp]=policy_id
135 noti=request.args.get('notificationDestination')
136 callbacks[policy_id]=noti
138 policy_instances[policy_type_id][policy_id]=data
140 if (policy_types[policy_type_id]['statusSchema'] is not None):
142 ps["enforceStatus"] = ""
143 ps["enforceReason"] = ""
144 policy_status[policy_id] = ps
147 return Response(json.dumps(data), 200, mimetype=APPL_JSON)
150 headers['Location']='/A1-P/v2/policytypes/' + policy_type_id + '/policies/' + policy_id
151 return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON)
153 # API Function: Get a policy
154 def get_policy(policyTypeId, policyId):
156 extract_host_name(hosts_set, request)
158 if ((r := check_modified_response()) is not None):
161 policy_type_id=str(policyTypeId)
162 policy_id=str(policyId)
164 if (policy_type_id not in policy_types.keys()):
165 pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id)
166 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
168 if (policy_id not in policy_instances[policy_type_id].keys()):
169 pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
170 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
172 return Response(json.dumps(policy_instances[policy_type_id][policy_id]), 200, mimetype=APPL_JSON)
175 # API Function: Delete a policy
176 def delete_policy(policyTypeId, policyId):
178 extract_host_name(hosts_set, request)
180 if ((r := check_modified_response()) is not None):
183 policy_type_id=str(policyTypeId)
184 policy_id=str(policyId)
186 if (policy_type_id not in policy_types.keys()):
187 pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id)
188 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
190 if (policy_id not in policy_instances[policy_type_id].keys()):
191 pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
192 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
194 if (is_duplicate_check()):
195 fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_id], policy_type_id)
197 fp_previous=policy_id
199 policy_fingerprint.pop(fp_previous)
200 policy_instances[policy_type_id].pop(policy_id)
201 policy_status.pop(policy_id)
202 callbacks.pop(policy_id)
203 return Response('', 204, mimetype=APPL_JSON)
206 # API Function: Get status for a policy
207 def get_policy_status(policyTypeId, policyId):
209 extract_host_name(hosts_set, request)
211 if ((r := check_modified_response()) is not None):
214 policy_type_id=str(policyTypeId)
215 policy_id=str(policyId)
217 if (policy_type_id not in policy_types.keys()):
218 pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_id)
219 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
221 if (policy_id not in policy_instances[policy_type_id].keys()):
222 pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policy_id)
223 return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
225 return Response(json.dumps(policy_status[policy_id]), status=200, mimetype=APPL_JSON)
227 # Helper: Create a response object if forced http response code is set
228 def get_forced_response():
229 if (forced_settings['code'] is not None):
230 pjson=create_error_response(forced_settings['code'])
231 forced_settings['code']=None
232 return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
235 # Helper: Delay if delayed response code is set
237 if (forced_settings['delay'] is not None):
239 val=int(forced_settings['delay'])
244 # Helper: Check if response shall be delayed or a forced response shall be sent
245 def check_modified_response():
247 return get_forced_response()
249 # Helper: Create a problem json object
250 def create_problem_json(type_of, title, status, detail, instance):
253 if type_of is not None:
254 error["type"] = type_of
255 if title is not None:
256 error["title"] = title
257 if status is not None:
258 error["status"] = status
259 if detail is not None:
260 error["detail"] = detail
261 if instance is not None:
262 error["instance"] = instance
265 # Helper: Create a problem json based on a generic http response code
266 def create_error_response(code):
269 return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
271 return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
273 return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
275 return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
277 return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
279 return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
281 return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
283 return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))