Merge "New version of NearRT-RIC simulator"
[nonrtric.git] / near-rt-ric-simulator / ric-plt / a1 / a1.py
1 #!/u:sr/bin/env python3
2 import copy
3 import datetime
4 import json
5 import logging
6 import requests
7
8 from connexion import NoContent
9 from flask import Flask, escape, request
10 from jsonschema import validate
11 from random import random, choice
12 from var_declaration import policy_instances, policy_types, policy_status, notification_destination, notificationDestination
13
14 def get_all_policies():
15   all_p = copy.deepcopy(policy_instances)
16   all_policies = []
17   for i in all_p.keys():
18     all_p[i]["enforceStatus"] = policy_status[i]["enforceStatus"]
19     all_policies.insert(len(all_policies)-1, all_p[i])
20   return(all_policies, 200)
21
22 def put_policy(policyId):
23   data = request.data.decode("utf-8")
24   data = data.replace("'", "\"")
25   data = json.loads(data)
26   ps = {}
27
28   if data["policyTypeId"] not in list(policy_types.keys()):
29     return(set_error(None, "The policy type provided does not exist.", 404, "The policy type " + data["policyTypeId"] + " is not defined as a policy type.", None, "policyTypeId", None))
30
31   pt = data["policyTypeId"]
32   schema = policy_types[pt]
33   try:
34     validate(instance=data["policyClause"], schema=schema)
35   except:
36     return(set_error(None, "The json does not validate against the schema.", 400, None, None, None, None))
37
38   if data["policyId"] in list(policy_instances.keys()):
39     if data["policyClause"]["scope"] != policy_instances[data["policyId"]]["policyClause"]["scope"]:
40       return(set_error(None, "The policy already exists with a different scope.", 404, "The policy put involves a modification of the existing scope, which is not allowed.", None, "scope", None))
41
42   if data["policyId"] != policyId:
43     return(set_error(None, "Wrong policy identity.", 400, "The policy instance's identity does not match with the one specified in the address.", None, "policyId", "The policy identity " + data["policyId"] + " is different from the address: " + policyId))
44
45   for i in list(policy_instances.keys()):
46     if data["policyId"] != i and \
47        data["policyClause"] == policy_instances[i]["policyClause"] and \
48        data["policyTypeId"] == policy_instances[i]["policyTypeId"] and \
49        data["notificationDestination"] == policy_instances[i]["notificationDestination"]:
50       return(set_error(None, "The policy already exists with a different id.", 404, "No action has been taken. The id of the existing policy instance is: " + i + ".", None, None, None))
51
52   if policyId in policy_instances.keys():
53     code = 201
54   else:
55     code = 200
56   policy_instances[policyId] = data
57   policy_status[policyId] = set_status("UNDEFINED")
58   notification_destination[policyId] = data["notificationDestination"]
59   return(policy_instances[policyId], code)
60
61 def set_status(*args):
62   ps = {}
63   if len(args) == 0:
64     rand_status = randomise_status()
65     ps["policyId"] = policyId
66     ps["enforceStatus"] = rand_status
67     if rand_status == "NOT_ENFORCED":
68       rand_reason = randomise_reason()
69       ps["enforceReason"] = rand_reason
70   if args[0] in ["UNDEFINED", "ENFORCED", "NOT_ENFORCED"]:
71     ps["enforceStatus"] = args[0]
72   else:
73     return(set_error(None, "Wrong enforceStatus.", 400, None, None, "enforceStatus", "enforceStatus should be one of \"UNDEFINED\", \"ENFORCED\" or \"NOT_ENFORCED\""))
74   if args[0] == "NOT_ENFORCED":
75     if args[1] in ["100", "200", "300", "800"]:
76       ps["enforceReason"] = args[1]
77     else:
78       return(set_error(None, "Wrong enforceReason.", 400, None, None, "enforceReason", "enforceReason should be one of \"100\", \"200\", \"300\" or \"800\""))
79   return ps
80
81 def get_policy(policyId):
82   if policyId in policy_instances.keys():
83     res = policy_instances[policyId]
84     res["enforceStatus"] = policy_status[policyId]["enforceStatus"]
85     return(res, 200)
86   else:
87     return(set_error(None, "The requested policy does not exist.", 404, None, None, "policyId", None))
88
89 def delete_policy(policyId):
90   if policyId in policy_instances.keys():
91     policy_instances.pop(policyId)
92     policy_status.pop(policyId)
93     return(None, 204)
94   else:
95     return(set_error(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", None, "policyId", None))
96
97 def get_all_policy_identities():
98   return(list(policy_instances.keys()), 200)
99
100 def randomise_status():
101   x = random()
102   if x > 0.5001:
103     res = "ENFORCED"
104   elif x < 0.4999:
105     res = "NOT_ENFORCED"
106   else:
107     res = "UNDEFINED"
108   return res
109
110 def randomise_reason():
111   options = ["100", "200", "300", "800"]
112   return choice(options)
113
114 def get_all_policy_status():
115   all_s = copy.deepcopy(policy_status)
116   all_status = []
117   for i in all_s.keys():
118     all_s[i]["policyId"] = i
119     all_status.insert(len(all_status)-1, all_s[i])
120   return(all_status, 200)
121
122 def get_policy_status(policyId):
123   return(policy_status[policyId], 200)
124
125 def get_all_policytypes():
126   all_policytypes = []
127   for i in policy_types.keys():
128     all_policytypes.insert(len(all_policytypes)-1, policy_types[i])
129   return(all_policytypes, 200)
130
131 def get_all_policytypes_identities():
132   return(list(policy_types.keys()), 200)
133
134 def get_policytypes(policyTypeId):
135   if policyTypeId in policy_types.keys():
136     return(policy_types[policyTypeId], 200)
137   else:
138     return(set_error(None, "The requested policy type does not exist.", 404, None, None, "policyTypeId", None))
139
140 def put_policytypes_subscription():
141   global notificationDestination
142   data = request.data.decode("utf-8")
143   data = data.replace("'", "\"")
144   data = json.loads(data)
145   if not notificationDestination:
146     notificationDestination["notificationDestionation"] = data
147     return(None, 201)
148   else:
149     notificationDestination["notificationDestionation"] = data
150     return(None, 200)
151
152 def get_policytypes_subscription():
153   if not notificationDestination:
154     return(set_error(None, "The notification destination has not been defined.", 404, None, None, "notificationDestination", None))
155   else:
156     return(notificationDestination["notificationDestionation"], 200)
157
158 def set_error(type_of, title, status, detail, instance, param, reason):
159   error = {}
160   params = {}
161   if type_of is not None:
162     error["type"] = type_of
163   if title is not None:
164     error["title"] = title
165   if status is not None:
166     error["status"] = status
167   if detail is not None:
168     error["detail"] = detail
169   if instance is not None:
170     error["instance"] = instance
171   if param is not None:
172     params["param"] = param
173   if reason is not None:
174     params["reason"] = reason
175   if params:
176     error["invalidParams"] = params
177   return(error, error["status"])