6ec59ff13a49e9141e91e15b0352eec3d394bd16
[sim/a1-interface.git] / near-rt-ric-simulator / src / OSC_2.1.0 / a1.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import json
19 import datetime
20 import time
21
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name
29
30 #Constsants
31 APPL_JSON='application/json'
32
33 # API Function: Health check
34 def get_healthcheck():
35
36   extract_host_name(hosts_set, request)
37
38   if ((r := check_modified_response()) is not None):
39     return r
40
41   return (None, 200)
42
43 # API Function: Get all policy type ids
44 def get_all_policy_types():
45
46   extract_host_name(hosts_set, request)
47
48   if ((r := check_modified_response()) is not None):
49     return r
50
51   res = list(policy_instances.keys())
52   res = list(map(int, res))
53   return (res, 200)
54
55 # API Function: Get a policy type
56 def get_policy_type(policy_type_id):
57
58   extract_host_name(hosts_set, request)
59
60   if ((r := check_modified_response()) is not None):
61     return r
62
63   policy_type_id=str(policy_type_id)
64
65   if (policy_type_id not in policy_types.keys()):
66     return (None, 404)
67
68   return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON)
69
70 # API Function: Delete a policy type
71 def delete_policy_type(policy_type_id):
72
73   extract_host_name(hosts_set, request)
74
75   if ((r := check_modified_response()) is not None):
76     return r
77
78   policy_type_id=str(policy_type_id)
79
80   if (policy_type_id not in policy_instances.keys()):
81     return (None, 404)
82
83   if (len(policy_instances[policy_type_id]) > 0):
84     return (None, 400)
85
86   del policy_instances[policy_type_id]
87   del policy_types[policy_type_id]
88
89   return (None, 204)
90
91
92 # API Function: Create a policy type
93 def create_policy_type(policy_type_id):
94
95   extract_host_name(hosts_set, request)
96
97   if ((r := check_modified_response()) is not None):
98     return r
99
100   try:
101     int(policy_type_id)
102   except Exception:
103     return Response("The policy type id is not an int", 400, mimetype='text/plain')
104
105   policy_type_id=str(policy_type_id)
106
107   if (policy_type_id in policy_instances.keys()):
108     if (len(policy_instances[policy_type_id]) > 0):
109       return (None, 400)
110
111   try:
112     data = request.data
113     data = json.loads(data)
114   except Exception:
115     return (None, 400)
116
117   if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
118     return (None, 400)
119
120   if (policy_type_id not in policy_instances.keys()):
121     policy_instances[policy_type_id]={}
122
123   policy_types[policy_type_id]=data
124
125   return (None, 201)
126
127
128 # API Function: Get all policy ids for a type
129 def get_all_policy_identities(policy_type_id):
130
131   extract_host_name(hosts_set, request)
132
133   if ((r := check_modified_response()) is not None):
134     return r
135
136   policy_type_id=str(policy_type_id)
137
138   if (policy_type_id not in policy_instances.keys()):
139     return (None, 404)
140   return (list(policy_instances[policy_type_id].keys()), 200)
141
142 # API Function: Get a policy instance
143 def get_policy_instance(policy_type_id, policy_instance_id):
144
145   extract_host_name(hosts_set, request)
146
147   if ((r := check_modified_response()) is not None):
148     return r
149
150   policy_type_id=str(policy_type_id)
151
152   if (policy_type_id not in policy_instances.keys()):
153     return (None, 404)
154
155   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
156     return (None, 404)
157
158   return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype=APPL_JSON)
159
160 # API function: Delete a policy
161 def delete_policy_instance(policy_type_id, policy_instance_id):
162
163   extract_host_name(hosts_set, request)
164
165   if ((r := check_modified_response()) is not None):
166     return r
167
168   policy_type_id=str(policy_type_id)
169
170   if (policy_type_id not in policy_instances.keys()):
171     return (None, 404)
172
173   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
174     return (None, 404)
175
176   fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
177   del policy_fingerprint[fp_previous]
178   del policy_instances[policy_type_id][policy_instance_id]
179   del policy_status[policy_instance_id]
180
181   return (None, 202)
182
183 # API function: Create/update a policy
184 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
185
186   extract_host_name(hosts_set, request)
187
188   if ((r := check_modified_response()) is not None):
189     return r
190
191   policy_type_id=str(policy_type_id)
192
193   if (policy_type_id not in policy_instances.keys()):
194     return (None, 404)
195
196   try:
197     data = request.data
198     data = json.loads(data)
199   except Exception:
200     return (None, 400)
201
202   try:
203     validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
204   except Exception:
205     return (None, 400)
206
207   fp_previous=None
208   if policy_instance_id in policy_instances[policy_type_id].keys():
209     retcode=200
210     fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
211   else:
212     if (policy_instance_id in policy_fingerprint.values()):
213       return (None, 400)
214
215   fp=calcFingerprint(data)
216   if (fp in policy_fingerprint.keys()):
217     p_id=policy_fingerprint[fp]
218     if (p_id != policy_instance_id):
219       return (None, 400)
220
221   if (fp_previous is not None):
222     del policy_fingerprint[fp_previous]
223
224   policy_fingerprint[fp]=policy_instance_id
225
226   policy_instances[policy_type_id][policy_instance_id]=data
227   ps={}
228   ps["instance_status"] = "NOT IN EFFECT"
229   ps["has_been_deleted"] = "false"
230   ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
231   policy_status[policy_instance_id]=ps
232
233   return (None, 202)
234
235 # API function: Get policy status
236 def get_policy_instance_status(policy_type_id, policy_instance_id):
237
238   extract_host_name(hosts_set, request)
239
240   if ((r := check_modified_response()) is not None):
241     return r
242
243   policy_type_id=str(policy_type_id)
244   if (policy_type_id not in policy_instances.keys()):
245     return (None, 404)
246
247   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
248     return (None, 404)
249
250   return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype=APPL_JSON)
251
252 # Helper: Create a response object if forced http response code is set
253 def get_forced_response():
254
255   response_code=forced_settings['code']
256   if (response_code is not None):
257     forced_settings['code'] = None
258     return (None, response_code)
259   return None
260
261 # Helper: Delay if delayed response code is set
262 def do_delay():
263
264   if (forced_settings['delay'] is not None):
265     try:
266       val=int(forced_settings['delay'])
267       time.sleep(val)
268     except Exception:
269       return
270   return
271
272 # Helper: Check if response shall be delayed or a forced response shall be sent
273 def check_modified_response():
274   do_delay()
275   return get_forced_response()