Fix two bugs in simulator & code cleaning
[sim/a1-interface.git] / near-rt-ric-simulator / src / OSC_2.1.0 / a1.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import json
19 import datetime
20 import time
21
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name
29
30
31 # API Function: Health check
32 def get_healthcheck():
33
34   extract_host_name(hosts_set, request)
35
36   if ((r := check_modified_response()) is not None):
37     return r
38
39   return (None, 200)
40
41 # API Function: Get all policy type ids
42 def get_all_policy_types():
43
44   extract_host_name(hosts_set, request)
45
46   if ((r := check_modified_response()) is not None):
47     return r
48
49   res = list(policy_instances.keys())
50   res = list(map(int, res))
51   return (res, 200)
52
53 # API Function: Get a policy type
54 def get_policy_type(policy_type_id):
55
56   extract_host_name(hosts_set, request)
57
58   if ((r := check_modified_response()) is not None):
59     return r
60
61   policy_type_id=str(policy_type_id)
62
63   if (policy_type_id not in policy_types.keys()):
64     return (None, 404)
65
66   return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype='application/json')
67
68 # API Function: Delete a policy type
69 def delete_policy_type(policy_type_id):
70
71   extract_host_name(hosts_set, request)
72
73   if ((r := check_modified_response()) is not None):
74     return r
75
76   policy_type_id=str(policy_type_id)
77
78   if (policy_type_id not in policy_instances.keys()):
79     return (None, 404)
80
81   if (len(policy_instances[policy_type_id]) > 0):
82     return (None, 400)
83
84   del policy_instances[policy_type_id]
85   del policy_types[policy_type_id]
86
87   return (None, 204)
88
89
90 # API Function: Create a policy type
91 def create_policy_type(policy_type_id):
92
93   extract_host_name(hosts_set, request)
94
95   if ((r := check_modified_response()) is not None):
96     return r
97
98   try:
99     val=int(policy_type_id)
100   except:
101     return Response("The policy type id is not an int", 400, mimetype='text/plain')
102
103   policy_type_id=str(policy_type_id)
104
105   if (policy_type_id in policy_instances.keys()):
106     if (len(policy_instances[policy_type_id]) > 0):
107       return (None, 400)
108
109   try:
110     data = request.data
111     data = json.loads(data)
112   except:
113     return (None, 400)
114
115   if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
116     return (None, 400)
117
118   if (policy_type_id not in policy_instances.keys()):
119     policy_instances[policy_type_id]={}
120
121   policy_types[policy_type_id]=data
122
123   return (None, 201)
124
125
126 # API Function: Get all policy ids for a type
127 def get_all_policy_identities(policy_type_id):
128
129   extract_host_name(hosts_set, request)
130
131   if ((r := check_modified_response()) is not None):
132     return r
133
134   policy_type_id=str(policy_type_id)
135
136   if (policy_type_id not in policy_instances.keys()):
137     return (None, 404)
138   return (list(policy_instances[policy_type_id].keys()), 200)
139
140 # API Function: Get a policy instance
141 def get_policy_instance(policy_type_id, policy_instance_id):
142
143   extract_host_name(hosts_set, request)
144
145   if ((r := check_modified_response()) is not None):
146     return r
147
148   policy_type_id=str(policy_type_id)
149
150   if (policy_type_id not in policy_instances.keys()):
151     return (None, 404)
152
153   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
154     return (None, 404)
155
156   return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype='application/json')
157
158 # API function: Delete a policy
159 def delete_policy_instance(policy_type_id, policy_instance_id):
160
161   extract_host_name(hosts_set, request)
162
163   if ((r := check_modified_response()) is not None):
164     return r
165
166   policy_type_id=str(policy_type_id)
167
168   if (policy_type_id not in policy_instances.keys()):
169     return (None, 404)
170
171   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
172     return (None, 404)
173
174   fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
175   del policy_fingerprint[fpPrevious]
176   del policy_instances[policy_type_id][policy_instance_id]
177   del policy_status[policy_instance_id]
178
179   return (None, 202)
180
181 # API function: Create/update a policy
182 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
183
184   extract_host_name(hosts_set, request)
185
186   if ((r := check_modified_response()) is not None):
187     return r
188
189   policy_type_id=str(policy_type_id)
190
191   if (policy_type_id not in policy_instances.keys()):
192     return (None, 404)
193
194   try:
195     data = request.data
196     data = json.loads(data)
197   except:
198     return (None, 400)
199
200   try:
201     validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
202   except:
203     return (None, 400)
204
205   fpPrevious=None
206   retcode=201
207   if policy_instance_id in policy_instances[policy_type_id].keys():
208     retcode=200
209     fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
210   else:
211     if (policy_instance_id in policy_fingerprint.values()):
212       return (None, 400)
213
214   fp=calcFingerprint(data)
215   if (fp in policy_fingerprint.keys()):
216     id=policy_fingerprint[fp]
217     if (id != policy_instance_id):
218       return (None, 400)
219
220   if (fpPrevious is not None):
221     del policy_fingerprint[fpPrevious]
222
223   policy_fingerprint[fp]=policy_instance_id
224
225   policy_instances[policy_type_id][policy_instance_id]=data
226   ps={}
227   ps["instance_status"] = "NOT IN EFFECT"
228   ps["has_been_deleted"] = "false"
229   ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
230   policy_status[policy_instance_id]=ps
231
232   return (None, 202)
233
234 # API function: Get policy status
235 def get_policy_instance_status(policy_type_id, policy_instance_id):
236
237   extract_host_name(hosts_set, request)
238
239   if ((r := check_modified_response()) is not None):
240     return r
241
242   policy_type_id=str(policy_type_id)
243   if (policy_type_id not in policy_instances.keys()):
244     return (None, 404)
245
246   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
247     return (None, 404)
248
249   return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype='application/json')
250
251 # Helper: Create a response object if forced http response code is set
252 def get_forced_response():
253
254   response_code=forced_settings['code']
255   if (response_code is not None):
256     forced_settings['code'] = None
257     return (None, response_code)
258   return None
259
260 # Helper: Delay if delayed response code is set
261 def do_delay():
262
263   if (forced_settings['delay'] is not None):
264     try:
265       val=int(forced_settings['delay'])
266       time.sleep(val)
267     except:
268       return
269   return
270
271 # Helper: Check if response shall be delayed or a forced response shall be sent
272 def check_modified_response():
273   do_delay()
274   return get_forced_response()