1 # ============LICENSE_START===============================================
2 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name
31 # API Function: Health check
32 def get_healthcheck():
34 extract_host_name(hosts_set, request)
36 if ((r := check_modified_response()) is not None):
41 # API Function: Get all policy type ids
42 def get_all_policy_types():
44 extract_host_name(hosts_set, request)
46 if ((r := check_modified_response()) is not None):
49 return (list(policy_instances.keys()), 200)
51 # API Function: Get a policy type
52 def get_policy_type(policy_type_id):
54 extract_host_name(hosts_set, request)
56 if ((r := check_modified_response()) is not None):
59 policy_type_id=str(policy_type_id)
61 if (policy_type_id not in policy_types.keys()):
64 return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype='application/json')
66 # API Function: Delete a policy type
67 def delete_policy_type(policy_type_id):
69 extract_host_name(hosts_set, request)
71 if ((r := check_modified_response()) is not None):
74 policy_type_id=str(policy_type_id)
76 if (policy_type_id not in policy_instances.keys()):
79 if (len(policy_instances[policy_type_id]) > 0):
82 del policy_instances[policy_type_id]
83 del policy_types[policy_type_id]
88 # API Function: Create a policy type
89 def create_policy_type(policy_type_id):
91 extract_host_name(hosts_set, request)
93 if ((r := check_modified_response()) is not None):
96 policy_type_id=str(policy_type_id)
98 if (policy_type_id in policy_instances.keys()):
99 if (len(policy_instances[policy_type_id]) > 0):
104 data = json.loads(data)
108 if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
111 if (policy_type_id not in policy_instances.keys()):
112 policy_instances[policy_type_id]={}
114 policy_types[policy_type_id]=data
119 # API Function: Get all policy ids for a type
120 def get_all_policy_identities(policy_type_id):
122 extract_host_name(hosts_set, request)
124 if ((r := check_modified_response()) is not None):
127 policy_type_id=str(policy_type_id)
129 if (policy_type_id not in policy_instances.keys()):
131 return (list(policy_instances[policy_type_id].keys()), 200)
133 # API Function: Get a policy instance
134 def get_policy_instance(policy_type_id, policy_instance_id):
136 extract_host_name(hosts_set, request)
138 if ((r := check_modified_response()) is not None):
141 policy_type_id=str(policy_type_id)
143 if (policy_type_id not in policy_instances.keys()):
146 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
149 return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype='application/json')
151 # API function: Delete a policy
152 def delete_policy_instance(policy_type_id, policy_instance_id):
154 extract_host_name(hosts_set, request)
156 if ((r := check_modified_response()) is not None):
159 policy_type_id=str(policy_type_id)
161 if (policy_type_id not in policy_instances.keys()):
164 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
167 fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
168 del policy_fingerprint[fpPrevious]
169 del policy_instances[policy_type_id][policy_instance_id]
170 del policy_status[policy_instance_id]
174 # API function: Create/update a policy
175 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
177 extract_host_name(hosts_set, request)
179 if ((r := check_modified_response()) is not None):
182 policy_type_id=str(policy_type_id)
184 if (policy_type_id not in policy_instances.keys()):
189 data = json.loads(data)
194 validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
200 if policy_instance_id in policy_instances[policy_type_id].keys():
202 fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
204 if (policy_instance_id in policy_fingerprint.values()):
207 fp=calcFingerprint(data)
208 if (fp in policy_fingerprint.keys()):
209 id=policy_fingerprint[fp]
210 if (id != policy_instance_id):
213 if (fpPrevious is not None):
214 del policy_fingerprint[fpPrevious]
216 policy_fingerprint[fp]=policy_instance_id
218 policy_instances[policy_type_id][policy_instance_id]=data
220 ps["instance_status"] = "NOT IN EFFECT"
221 ps["has_been_deleted"] = "false"
222 ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
223 policy_status[policy_instance_id]=ps
227 # API function: Get policy status
228 def get_policy_instance_status(policy_type_id, policy_instance_id):
230 extract_host_name(hosts_set, request)
232 if ((r := check_modified_response()) is not None):
235 policy_type_id=str(policy_type_id)
236 if (policy_type_id not in policy_instances.keys()):
239 if (policy_instance_id not in policy_instances[policy_type_id].keys()):
242 return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype='application/json')
244 # Helper: Create a response object if forced http response code is set
245 def get_forced_response():
247 response_code=forced_settings['code']
248 if (response_code is not None):
249 forced_settings['code'] = None
250 return (None, response_code)
253 # Helper: Delay if delayed response code is set
256 if (forced_settings['delay'] is not None):
258 val=int(forced_settings['delay'])
264 # Helper: Check if response shall be delayed or a forced response shall be sent
265 def check_modified_response():
267 return get_forced_response()