Docs: Update release notes
[sim/a1-interface.git] / near-rt-ric-simulator / src / OSC_2.1.0 / a1.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2021 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import json
19 import datetime
20 import time
21
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name, is_duplicate_check
29 from payload_logging import is_payload_logging
30
31 #Constsants
32 APPL_JSON='application/json'
33
34
35 #Helper funtion to log http reponse
36 def log_resp_text(msg):
37   global payload_log
38   if (is_payload_logging()):
39     print("-----Error description-----")
40     print(str(msg))
41
42 # API Function: Health check
43 def get_healthcheck():
44
45   extract_host_name(hosts_set, request)
46
47   if ((r := check_modified_response()) is not None):
48     return r
49
50   return (None, 200)
51
52 # API Function: Get all policy type ids
53 def get_all_policy_types():
54
55   extract_host_name(hosts_set, request)
56
57   if ((r := check_modified_response()) is not None):
58     return r
59
60   res = list(policy_instances.keys())
61   res = list(map(int, res))
62   return (res, 200)
63
64 # API Function: Get a policy type
65 def get_policy_type(policy_type_id):
66
67   extract_host_name(hosts_set, request)
68
69   if ((r := check_modified_response()) is not None):
70     return r
71
72   policy_type_id=str(policy_type_id)
73
74   if (policy_type_id not in policy_types.keys()):
75     log_resp_text("Policy type id not found")
76     return (None, 404)
77
78   return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON)
79
80 # API Function: Delete a policy type
81 def delete_policy_type(policy_type_id):
82
83   extract_host_name(hosts_set, request)
84
85   if ((r := check_modified_response()) is not None):
86     return r
87
88   policy_type_id=str(policy_type_id)
89
90   if (policy_type_id not in policy_instances.keys()):
91     log_resp_text("Policy type not found")
92     return (None, 404)
93
94   if (len(policy_instances[policy_type_id]) > 0):
95     log_resp_text("Policy type cannot be removed, instances exists")
96     return (None, 400)
97
98   del policy_instances[policy_type_id]
99   del policy_types[policy_type_id]
100
101   return (None, 204)
102
103
104 # API Function: Create a policy type
105 def create_policy_type(policy_type_id):
106
107   extract_host_name(hosts_set, request)
108
109   if ((r := check_modified_response()) is not None):
110     return r
111
112   try:
113     int(policy_type_id)
114   except Exception:
115     return Response("The policy type id is not an int", 400, mimetype='text/plain')
116
117   policy_type_id=str(policy_type_id)
118
119   if (policy_type_id in policy_instances.keys()):
120     if (len(policy_instances[policy_type_id]) > 0):
121       log_resp_text("Policy type id already exists")
122       return (None, 400)
123
124   try:
125     data = request.data
126     data = json.loads(data)
127   except Exception:
128     log_resp_text("Policy type validation failure")
129     return (None, 400)
130
131   if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
132     log_resp_text("Parameters missing in policy type")
133     return (None, 400)
134
135   if (policy_type_id not in policy_instances.keys()):
136     policy_instances[policy_type_id]={}
137
138   policy_types[policy_type_id]=data
139
140   return (None, 201)
141
142
143 # API Function: Get all policy ids for a type
144 def get_all_policy_identities(policy_type_id):
145
146   extract_host_name(hosts_set, request)
147
148   if ((r := check_modified_response()) is not None):
149     return r
150
151   policy_type_id=str(policy_type_id)
152
153   if (policy_type_id not in policy_instances.keys()):
154     log_resp_text("Policy type id not found")
155     return (None, 404)
156   return (list(policy_instances[policy_type_id].keys()), 200)
157
158 # API Function: Get a policy instance
159 def get_policy_instance(policy_type_id, policy_instance_id):
160
161   extract_host_name(hosts_set, request)
162
163   if ((r := check_modified_response()) is not None):
164     return r
165
166   policy_type_id=str(policy_type_id)
167
168   if (policy_type_id not in policy_instances.keys()):
169     log_resp_text("Policy type id not found")
170     return (None, 404)
171
172   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
173     log_resp_text("Policy instance id not found")
174     return (None, 404)
175
176   return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype=APPL_JSON)
177
178 # API function: Delete a policy
179 def delete_policy_instance(policy_type_id, policy_instance_id):
180
181   extract_host_name(hosts_set, request)
182
183   if ((r := check_modified_response()) is not None):
184     return r
185
186   policy_type_id=str(policy_type_id)
187
188   if (policy_type_id not in policy_instances.keys()):
189     log_resp_text("Policy type id not found")
190     return (None, 404)
191
192   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
193     log_resp_text("Policy instance id not found")
194     return (None, 404)
195
196   if (is_duplicate_check()):
197     fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id], policy_type_id)
198   else:
199     fp_previous=policy_instance_id
200
201   del policy_fingerprint[fp_previous]
202   del policy_instances[policy_type_id][policy_instance_id]
203   del policy_status[policy_instance_id]
204
205   return (None, 202)
206
207 # API function: Create/update a policy
208 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
209
210   extract_host_name(hosts_set, request)
211
212   if ((r := check_modified_response()) is not None):
213     return r
214
215   policy_type_id=str(policy_type_id)
216
217   if (policy_type_id not in policy_instances.keys()):
218     log_resp_text("Policy type id not found")
219     return (None, 404)
220
221   try:
222     data = request.data
223     data = json.loads(data)
224   except Exception:
225     log_resp_text("Policy json error")
226     return (None, 400)
227
228   try:
229     validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
230   except Exception:
231     log_resp_text("Policy validation error")
232     return (None, 400)
233
234   fp_previous=None
235   if policy_instance_id in policy_instances[policy_type_id].keys():
236     if (is_duplicate_check()):
237       fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id], policy_type_id)
238     else:
239       fp_previous=policy_instance_id
240
241   else:
242     if (policy_instance_id in policy_fingerprint.values()):
243       log_resp_text("Policy id already exist for other type")
244       return (None, 400)
245
246   if (is_duplicate_check()):
247     fp=calcFingerprint(data, policy_type_id)
248   else:
249     fp=policy_instance_id
250
251   if ((fp in policy_fingerprint.keys()) and is_duplicate_check()):
252     p_id=policy_fingerprint[fp]
253     if (p_id != policy_instance_id):
254       log_resp_text("Policy json duplicate of other instance")
255       return (None, 400)
256
257   if (fp_previous is not None):
258     del policy_fingerprint[fp_previous]
259
260   policy_fingerprint[fp]=policy_instance_id
261
262   policy_instances[policy_type_id][policy_instance_id]=data
263   ps={}
264   ps["instance_status"] = "NOT IN EFFECT"
265   ps["has_been_deleted"] = "false"
266   ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
267   policy_status[policy_instance_id]=ps
268
269   return (None, 202)
270
271 # API function: Get policy status
272 def get_policy_instance_status(policy_type_id, policy_instance_id):
273
274   extract_host_name(hosts_set, request)
275
276   if ((r := check_modified_response()) is not None):
277     return r
278
279   policy_type_id=str(policy_type_id)
280   if (policy_type_id not in policy_instances.keys()):
281     log_resp_text("Policy type id not found")
282     return (None, 404)
283
284   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
285     log_resp_text("Policy instance id not found")
286     return (None, 404)
287
288   return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype=APPL_JSON)
289
290 # Helper: Create a response object if forced http response code is set
291 def get_forced_response():
292
293   response_code=forced_settings['code']
294   if (response_code is not None):
295     forced_settings['code'] = None
296     return (None, response_code)
297   return None
298
299 # Helper: Delay if delayed response code is set
300 def do_delay():
301
302   if (forced_settings['delay'] is not None):
303     try:
304       val=int(forced_settings['delay'])
305       time.sleep(val)
306     except Exception:
307       return
308
309 # Helper: Check if response shall be delayed or a forced response shall be sent
310 def check_modified_response():
311   do_delay()
312   return get_forced_response()