1 # ============LICENSE_START===============================================
2 # Copyright (C) 2021 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name, is_duplicate_check
29 from payload_logging import is_payload_logging
32 APPL_JSON='application/json'
35 #Helper funtion to log http reponse
36 def log_resp_text(msg):
38 if (is_payload_logging()):
39 print("-----Error description-----")
42 # API Function: Health check
43 def get_healthcheck():
45 extract_host_name(hosts_set, request)
47 if ((r := check_modified_response()) is not None):
52 # API Function: Get all policy type ids
53 def get_all_policy_types():
55 extract_host_name(hosts_set, request)
57 if ((r := check_modified_response()) is not None):
60 res = list(policy_instances.keys())
61 res = list(map(int, res))
64 # API Function: Get a policy type
65 def get_policy_type(policy_type_id):
67 extract_host_name(hosts_set, request)
69 if ((r := check_modified_response()) is not None):
72 policy_type_id=str(policy_type_id)
74 if (policy_type_id not in policy_types.keys()):
75 log_resp_text("Policy type id not found")
78 return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON)
80 # API Function: Delete a policy type
81 def delete_policy_type(policy_type_id):
83 extract_host_name(hosts_set, request)
85 if ((r := check_modified_response()) is not None):
88 policy_type_id=str(policy_type_id)
90 if (policy_type_id not in policy_instances.keys()):
91 log_resp_text("Policy type not found")
94 if (len(policy_instances[policy_type_id]) > 0):
95 log_resp_text("Policy type cannot be removed, instances exists")
98 del policy_instances[policy_type_id]
99 del policy_types[policy_type_id]
104 # API Function: Create a policy type
105 def create_policy_type(policy_type_id):
107 extract_host_name(hosts_set, request)
109 if ((r := check_modified_response()) is not None):
115 return Response("The policy type id is not an int", 400, mimetype='text/plain')
117 policy_type_id=str(policy_type_id)
119 if (policy_type_id in policy_instances.keys()):
120 if (len(policy_instances[policy_type_id]) > 0):
121 log_resp_text("Policy type id already exists")
126 data = json.loads(data)
128 log_resp_text("Policy type validation failure")
131 if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
132 log_resp_text("Parameters missing in policy type")
135 if (policy_type_id not in policy_instances.keys()):
136 policy_instances[policy_type_id]={}
138 policy_types[policy_type_id]=data
143 # API Function: Get all policy ids for a type
144 def get_all_policy_identities(policy_type_id):
146 extract_host_name(hosts_set, request)
148 if ((r := check_modified_response()) is not None):
151 policy_type_id=str(policy_type_id)
153 if (policy_type_id not in policy_instances.keys()):
154 log_resp_text("Policy type id not found")
156 return (list(policy_instances[policy_type_id].keys()), 200)
158 # API Function: Get a policy instance
159 def get_policy_instance(policy_type_id, policy_instance_id):
161 extract_host_name(hosts_set, request)
163 if ((r := check_modified_response()) is not None):
166 policy_type_id=str(policy_type_id)
168 if (policy_type_id not in policy_instances.keys()):
169 log_resp_text("Policy type id not found")
172 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
173 log_resp_text("Policy instance id not found")
176 return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype=APPL_JSON)
178 # API function: Delete a policy
179 def delete_policy_instance(policy_type_id, policy_instance_id):
181 extract_host_name(hosts_set, request)
183 if ((r := check_modified_response()) is not None):
186 policy_type_id=str(policy_type_id)
188 if (policy_type_id not in policy_instances.keys()):
189 log_resp_text("Policy type id not found")
192 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
193 log_resp_text("Policy instance id not found")
196 if (is_duplicate_check()):
197 fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id], policy_type_id)
199 fp_previous=policy_instance_id
201 del policy_fingerprint[fp_previous]
202 del policy_instances[policy_type_id][policy_instance_id]
203 del policy_status[policy_instance_id]
207 # API function: Create/update a policy
208 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
210 extract_host_name(hosts_set, request)
212 if ((r := check_modified_response()) is not None):
215 policy_type_id=str(policy_type_id)
217 if (policy_type_id not in policy_instances.keys()):
218 log_resp_text("Policy type id not found")
223 data = json.loads(data)
225 log_resp_text("Policy json error")
229 validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
231 log_resp_text("Policy validation error")
235 if policy_instance_id in policy_instances[policy_type_id].keys():
236 if (is_duplicate_check()):
237 fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id], policy_type_id)
239 fp_previous=policy_instance_id
242 if (policy_instance_id in policy_fingerprint.values()):
243 log_resp_text("Policy id already exist for other type")
246 if (is_duplicate_check()):
247 fp=calcFingerprint(data, policy_type_id)
249 fp=policy_instance_id
251 if ((fp in policy_fingerprint.keys()) and is_duplicate_check()):
252 p_id=policy_fingerprint[fp]
253 if (p_id != policy_instance_id):
254 log_resp_text("Policy json duplicate of other instance")
257 if (fp_previous is not None):
258 del policy_fingerprint[fp_previous]
260 policy_fingerprint[fp]=policy_instance_id
262 policy_instances[policy_type_id][policy_instance_id]=data
264 ps["instance_status"] = "NOT IN EFFECT"
265 ps["has_been_deleted"] = "false"
266 ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
267 policy_status[policy_instance_id]=ps
271 # API function: Get policy status
272 def get_policy_instance_status(policy_type_id, policy_instance_id):
274 extract_host_name(hosts_set, request)
276 if ((r := check_modified_response()) is not None):
279 policy_type_id=str(policy_type_id)
280 if (policy_type_id not in policy_instances.keys()):
281 log_resp_text("Policy type id not found")
284 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
285 log_resp_text("Policy instance id not found")
288 return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype=APPL_JSON)
290 # Helper: Create a response object if forced http response code is set
291 def get_forced_response():
293 response_code=forced_settings['code']
294 if (response_code is not None):
295 forced_settings['code'] = None
296 return (None, response_code)
299 # Helper: Delay if delayed response code is set
302 if (forced_settings['delay'] is not None):
304 val=int(forced_settings['delay'])
309 # Helper: Check if response shall be delayed or a forced response shall be sent
310 def check_modified_response():
312 return get_forced_response()