1 # ============LICENSE_START===============================================
2 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name
31 APPL_JSON='application/json'
33 # API Function: Health check
34 def get_healthcheck():
36 extract_host_name(hosts_set, request)
38 if ((r := check_modified_response()) is not None):
43 # API Function: Get all policy type ids
44 def get_all_policy_types():
46 extract_host_name(hosts_set, request)
48 if ((r := check_modified_response()) is not None):
51 res = list(policy_instances.keys())
52 res = list(map(int, res))
55 # API Function: Get a policy type
56 def get_policy_type(policy_type_id):
58 extract_host_name(hosts_set, request)
60 if ((r := check_modified_response()) is not None):
63 policy_type_id=str(policy_type_id)
65 if (policy_type_id not in policy_types.keys()):
68 return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON)
70 # API Function: Delete a policy type
71 def delete_policy_type(policy_type_id):
73 extract_host_name(hosts_set, request)
75 if ((r := check_modified_response()) is not None):
78 policy_type_id=str(policy_type_id)
80 if (policy_type_id not in policy_instances.keys()):
83 if (len(policy_instances[policy_type_id]) > 0):
86 del policy_instances[policy_type_id]
87 del policy_types[policy_type_id]
92 # API Function: Create a policy type
93 def create_policy_type(policy_type_id):
95 extract_host_name(hosts_set, request)
97 if ((r := check_modified_response()) is not None):
103 return Response("The policy type id is not an int", 400, mimetype='text/plain')
105 policy_type_id=str(policy_type_id)
107 if (policy_type_id in policy_instances.keys()):
108 if (len(policy_instances[policy_type_id]) > 0):
113 data = json.loads(data)
117 if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
120 if (policy_type_id not in policy_instances.keys()):
121 policy_instances[policy_type_id]={}
123 policy_types[policy_type_id]=data
128 # API Function: Get all policy ids for a type
129 def get_all_policy_identities(policy_type_id):
131 extract_host_name(hosts_set, request)
133 if ((r := check_modified_response()) is not None):
136 policy_type_id=str(policy_type_id)
138 if (policy_type_id not in policy_instances.keys()):
140 return (list(policy_instances[policy_type_id].keys()), 200)
142 # API Function: Get a policy instance
143 def get_policy_instance(policy_type_id, policy_instance_id):
145 extract_host_name(hosts_set, request)
147 if ((r := check_modified_response()) is not None):
150 policy_type_id=str(policy_type_id)
152 if (policy_type_id not in policy_instances.keys()):
155 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
158 return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype=APPL_JSON)
160 # API function: Delete a policy
161 def delete_policy_instance(policy_type_id, policy_instance_id):
163 extract_host_name(hosts_set, request)
165 if ((r := check_modified_response()) is not None):
168 policy_type_id=str(policy_type_id)
170 if (policy_type_id not in policy_instances.keys()):
173 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
176 fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
177 del policy_fingerprint[fp_previous]
178 del policy_instances[policy_type_id][policy_instance_id]
179 del policy_status[policy_instance_id]
183 # API function: Create/update a policy
184 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
186 extract_host_name(hosts_set, request)
188 if ((r := check_modified_response()) is not None):
191 policy_type_id=str(policy_type_id)
193 if (policy_type_id not in policy_instances.keys()):
198 data = json.loads(data)
203 validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
208 if policy_instance_id in policy_instances[policy_type_id].keys():
210 fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
212 if (policy_instance_id in policy_fingerprint.values()):
215 fp=calcFingerprint(data)
216 if (fp in policy_fingerprint.keys()):
217 p_id=policy_fingerprint[fp]
218 if (p_id != policy_instance_id):
221 if (fp_previous is not None):
222 del policy_fingerprint[fp_previous]
224 policy_fingerprint[fp]=policy_instance_id
226 policy_instances[policy_type_id][policy_instance_id]=data
228 ps["instance_status"] = "NOT IN EFFECT"
229 ps["has_been_deleted"] = "false"
230 ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
231 policy_status[policy_instance_id]=ps
235 # API function: Get policy status
236 def get_policy_instance_status(policy_type_id, policy_instance_id):
238 extract_host_name(hosts_set, request)
240 if ((r := check_modified_response()) is not None):
243 policy_type_id=str(policy_type_id)
244 if (policy_type_id not in policy_instances.keys()):
247 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
250 return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype=APPL_JSON)
252 # Helper: Create a response object if forced http response code is set
253 def get_forced_response():
255 response_code=forced_settings['code']
256 if (response_code is not None):
257 forced_settings['code'] = None
258 return (None, response_code)
261 # Helper: Delay if delayed response code is set
264 if (forced_settings['delay'] is not None):
266 val=int(forced_settings['delay'])
272 # Helper: Check if response shall be delayed or a forced response shall be sent
273 def check_modified_response():
275 return get_forced_response()