1 #!/u:sr/bin/env python3
8 from connexion import NoContent
9 from flask import Flask, escape, request
10 from jsonschema import validate
11 from random import random, choice
12 from var_declaration import policy_instances, policy_types, policy_status, notification_destination, notificationDestination
14 def get_all_policies():
15 all_p = copy.deepcopy(policy_instances)
17 for i in all_p.keys():
18 all_p[i]["enforceStatus"] = policy_status[i]["enforceStatus"]
19 all_policies.insert(len(all_policies)-1, all_p[i])
20 return(all_policies, 200)
22 def put_policy(policyId):
23 data = request.data.decode("utf-8")
24 data = data.replace("'", "\"")
25 data = json.loads(data)
28 if data["policyTypeId"] not in list(policy_types.keys()):
29 return(set_error(None, "The policy type provided does not exist.", 404, "The policy type " + data["policyTypeId"] + " is not defined as a policy type.", None, "policyTypeId", None))
31 pt = data["policyTypeId"]
32 schema = policy_types[pt]
34 validate(instance=data["policyClause"], schema=schema)
36 return(set_error(None, "The json does not validate against the schema.", 400, None, None, None, None))
38 if data["policyId"] in list(policy_instances.keys()):
39 if data["policyClause"]["scope"] != policy_instances[data["policyId"]]["policyClause"]["scope"]:
40 return(set_error(None, "The policy already exists with a different scope.", 404, "The policy put involves a modification of the existing scope, which is not allowed.", None, "scope", None))
42 if data["policyId"] != policyId:
43 return(set_error(None, "Wrong policy identity.", 400, "The policy instance's identity does not match with the one specified in the address.", None, "policyId", "The policy identity " + data["policyId"] + " is different from the address: " + policyId))
45 for i in list(policy_instances.keys()):
46 if data["policyId"] != i and \
47 data["policyClause"] == policy_instances[i]["policyClause"] and \
48 data["policyTypeId"] == policy_instances[i]["policyTypeId"] and \
49 data["notificationDestination"] == policy_instances[i]["notificationDestination"]:
50 return(set_error(None, "The policy already exists with a different id.", 404, "No action has been taken. The id of the existing policy instance is: " + i + ".", None, None, None))
52 if policyId in policy_instances.keys():
56 policy_instances[policyId] = data
57 policy_status[policyId] = set_status("UNDEFINED")
58 notification_destination[policyId] = data["notificationDestination"]
59 return(policy_instances[policyId], code)
61 def set_status(*args):
64 rand_status = randomise_status()
65 ps["policyId"] = policyId
66 ps["enforceStatus"] = rand_status
67 if rand_status == "NOT_ENFORCED":
68 rand_reason = randomise_reason()
69 ps["enforceReason"] = rand_reason
70 if args[0] in ["UNDEFINED", "ENFORCED", "NOT_ENFORCED"]:
71 ps["enforceStatus"] = args[0]
73 return(set_error(None, "Wrong enforceStatus.", 400, None, None, "enforceStatus", "enforceStatus should be one of \"UNDEFINED\", \"ENFORCED\" or \"NOT_ENFORCED\""))
74 if args[0] == "NOT_ENFORCED":
75 if args[1] in ["100", "200", "300", "800"]:
76 ps["enforceReason"] = args[1]
78 return(set_error(None, "Wrong enforceReason.", 400, None, None, "enforceReason", "enforceReason should be one of \"100\", \"200\", \"300\" or \"800\""))
81 def get_policy(policyId):
82 if policyId in policy_instances.keys():
83 res = policy_instances[policyId]
84 res["enforceStatus"] = policy_status[policyId]["enforceStatus"]
87 return(set_error(None, "The requested policy does not exist.", 404, None, None, "policyId", None))
89 def delete_policy(policyId):
90 if policyId in policy_instances.keys():
91 policy_instances.pop(policyId)
92 policy_status.pop(policyId)
95 return(set_error(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", None, "policyId", None))
97 def get_all_policy_identities():
98 return(list(policy_instances.keys()), 200)
100 def randomise_status():
110 def randomise_reason():
111 options = ["100", "200", "300", "800"]
112 return choice(options)
114 def get_all_policy_status():
115 all_s = copy.deepcopy(policy_status)
117 for i in all_s.keys():
118 all_s[i]["policyId"] = i
119 all_status.insert(len(all_status)-1, all_s[i])
120 return(all_status, 200)
122 def get_policy_status(policyId):
123 return(policy_status[policyId], 200)
125 def get_all_policytypes():
127 for i in policy_types.keys():
128 all_policytypes.insert(len(all_policytypes)-1, policy_types[i])
129 return(all_policytypes, 200)
131 def get_all_policytypes_identities():
132 return(list(policy_types.keys()), 200)
134 def get_policytypes(policyTypeId):
135 if policyTypeId in policy_types.keys():
136 return(policy_types[policyTypeId], 200)
138 return(set_error(None, "The requested policy type does not exist.", 404, None, None, "policyTypeId", None))
140 def put_policytypes_subscription():
141 global notificationDestination
142 data = request.data.decode("utf-8")
143 data = data.replace("'", "\"")
144 data = json.loads(data)
145 if not notificationDestination:
146 notificationDestination["notificationDestionation"] = data
149 notificationDestination["notificationDestionation"] = data
152 def get_policytypes_subscription():
153 if not notificationDestination:
154 return(set_error(None, "The notification destination has not been defined.", 404, None, None, "notificationDestination", None))
156 return(notificationDestination["notificationDestionation"], 200)
158 def set_error(type_of, title, status, detail, instance, param, reason):
161 if type_of is not None:
162 error["type"] = type_of
163 if title is not None:
164 error["title"] = title
165 if status is not None:
166 error["status"] = status
167 if detail is not None:
168 error["detail"] = detail
169 if instance is not None:
170 error["instance"] = instance
171 if param is not None:
172 params["param"] = param
173 if reason is not None:
174 params["reason"] = reason
176 error["invalidParams"] = params
177 return(error, error["status"])