6db2599b9f3c175b474ebfed494459bdce140a89
[sim/a1-interface.git] / near-rt-ric-simulator / src / OSC_2.1.0 / a1.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import json
19 import datetime
20 import time
21
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings
27 from utils import calcFingerprint
28 from maincommon import *
29
30
31 # API Function: Health check
32 def get_healthcheck():
33
34   if ((r := check_modified_response()) is not None):
35     return r
36
37   return (None, 200)
38
39 # API Function: Get all policy type ids
40 def get_all_policy_types():
41
42   if ((r := check_modified_response()) is not None):
43     return r
44
45   return (list(policy_instances.keys()), 200)
46
47 # API Function: Get a policy type
48 def get_policy_type(policy_type_id):
49
50   if ((r := check_modified_response()) is not None):
51     return r
52
53   policy_type_id=str(policy_type_id)
54
55   if (policy_type_id not in policy_types.keys()):
56     return (None, 404)
57
58   return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype='application/json')
59
60 # API Function: Delete a policy type
61 def delete_policy_type(policy_type_id):
62
63   if ((r := check_modified_response()) is not None):
64     return r
65
66   policy_type_id=str(policy_type_id)
67
68   if (policy_type_id not in policy_instances.keys()):
69     return (None, 404)
70
71   if (len(policy_instances[policy_type_id]) > 0):
72     return (None, 400)
73
74   del policy_instances[policy_type_id]
75   del policy_types[policy_type_id]
76
77   return (None, 204)
78
79
80 # API Function: Create a policy type
81 def create_policy_type(policy_type_id):
82
83   if ((r := check_modified_response()) is not None):
84     return r
85
86   policy_type_id=str(policy_type_id)
87
88   if (policy_type_id in policy_instances.keys()):
89     if (len(policy_instances[policy_type_id]) > 0):
90       return (None, 400)
91
92   try:
93     data = request.data
94     data = json.loads(data)
95   except:
96     return (None, 400)
97
98   if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
99     return (None, 400)
100
101   if (policy_type_id not in policy_instances.keys()):
102     policy_instances[policy_type_id]={}
103
104   policy_types[policy_type_id]=data
105
106   return (None, 201)
107
108
109 # API Function: Get all policy ids for a type
110 def get_all_policy_identities(policy_type_id):
111
112   if ((r := check_modified_response()) is not None):
113     return r
114
115   policy_type_id=str(policy_type_id)
116
117   if (policy_type_id not in policy_instances.keys()):
118     return (None, 404)
119   return (list(policy_instances[policy_type_id].keys()), 200)
120
121 # API Function: Get a policy instance
122 def get_policy_instance(policy_type_id, policy_instance_id):
123
124   if ((r := check_modified_response()) is not None):
125     return r
126
127   policy_type_id=str(policy_type_id)
128
129   if (policy_type_id not in policy_instances.keys()):
130     return (None, 404)
131
132   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
133     return (None, 404)
134
135   return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype='application/json')
136
137 # API function: Delete a policy
138 def delete_policy_instance(policy_type_id, policy_instance_id):
139
140   if ((r := check_modified_response()) is not None):
141     return r
142
143   policy_type_id=str(policy_type_id)
144
145   if (policy_type_id not in policy_instances.keys()):
146     return (None, 404)
147
148   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
149     return (None, 404)
150
151   fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
152   del policy_fingerprint[fpPrevious]
153   del policy_instances[policy_type_id][policy_instance_id]
154   del policy_status[policy_instance_id]
155
156   return (None, 202)
157
158 # API function: Create/update a policy
159 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
160
161   if ((r := check_modified_response()) is not None):
162     return r
163
164   policy_type_id=str(policy_type_id)
165
166   if (policy_type_id not in policy_instances.keys()):
167     return (None, 404)
168
169   try:
170     data = request.data
171     data = json.loads(data)
172   except:
173     return (None, 400)
174
175   try:
176     validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
177   except:
178     return (None, 400)
179
180   fpPrevious=None
181   retcode=201
182   if policy_instance_id in policy_instances[policy_type_id].keys():
183     retcode=200
184     fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
185   else:
186     if (policy_instance_id in policy_fingerprint.values()):
187       return (None, 400)
188
189   fp=calcFingerprint(data)
190   if (fp in policy_fingerprint.keys()):
191     id=policy_fingerprint[fp]
192     if (id != policy_instance_id):
193       return (None, 400)
194
195   if (fpPrevious is not None):
196     del policy_fingerprint[fpPrevious]
197
198   policy_fingerprint[fp]=policy_instance_id
199
200   policy_instances[policy_type_id][policy_instance_id]=data
201   ps={}
202   ps["instance_status"] = "NOT IN EFFECT"
203   ps["has_been_deleted"] = "false"
204   ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
205   policy_status[policy_instance_id]=ps
206
207   return (None, 202)
208
209 # API function: Get policy status
210 def get_policy_instance_status(policy_type_id, policy_instance_id):
211
212   if ((r := check_modified_response()) is not None):
213     return r
214
215   policy_type_id=str(policy_type_id)
216   if (policy_type_id not in policy_instances.keys()):
217     return (None, 404)
218
219   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
220     return (None, 404)
221
222   return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype='application/json')
223
224 # Helper: Create a response object if forced http response code is set
225 def get_forced_response():
226
227   response_code=forced_settings['code']
228   if (response_code is not None):
229     forced_settings['code'] = None
230     return (None, response_code)
231   return None
232
233 # Helper: Delay if delayed response code is set
234 def do_delay():
235
236   if (forced_settings['delay'] is not None):
237     try:
238       val=int(forced_settings['delay'])
239       time.sleep(val)
240     except:
241       return
242   return
243
244 # Helper: Check if response shall be delayed or a forced response shall be sent
245 def check_modified_response():
246   do_delay()
247   return get_forced_response()