Added IPV6 support and support for https
[sim/a1-interface.git] / near-rt-ric-simulator / src / OSC_2.1.0 / a1.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import json
19 import datetime
20 import time
21
22 from datetime import datetime
23 from connexion import NoContent
24 from flask import Flask, request, Response
25 from jsonschema import validate
26 from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
27 from utils import calcFingerprint
28 from maincommon import extract_host_name
29
30
31 # API Function: Health check
32 def get_healthcheck():
33
34   extract_host_name(hosts_set, request)
35
36   if ((r := check_modified_response()) is not None):
37     return r
38
39   return (None, 200)
40
41 # API Function: Get all policy type ids
42 def get_all_policy_types():
43
44   extract_host_name(hosts_set, request)
45
46   if ((r := check_modified_response()) is not None):
47     return r
48
49   return (list(policy_instances.keys()), 200)
50
51 # API Function: Get a policy type
52 def get_policy_type(policy_type_id):
53
54   extract_host_name(hosts_set, request)
55
56   if ((r := check_modified_response()) is not None):
57     return r
58
59   policy_type_id=str(policy_type_id)
60
61   if (policy_type_id not in policy_types.keys()):
62     return (None, 404)
63
64   return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype='application/json')
65
66 # API Function: Delete a policy type
67 def delete_policy_type(policy_type_id):
68
69   extract_host_name(hosts_set, request)
70
71   if ((r := check_modified_response()) is not None):
72     return r
73
74   policy_type_id=str(policy_type_id)
75
76   if (policy_type_id not in policy_instances.keys()):
77     return (None, 404)
78
79   if (len(policy_instances[policy_type_id]) > 0):
80     return (None, 400)
81
82   del policy_instances[policy_type_id]
83   del policy_types[policy_type_id]
84
85   return (None, 204)
86
87
88 # API Function: Create a policy type
89 def create_policy_type(policy_type_id):
90
91   extract_host_name(hosts_set, request)
92
93   if ((r := check_modified_response()) is not None):
94     return r
95
96   policy_type_id=str(policy_type_id)
97
98   if (policy_type_id in policy_instances.keys()):
99     if (len(policy_instances[policy_type_id]) > 0):
100       return (None, 400)
101
102   try:
103     data = request.data
104     data = json.loads(data)
105   except:
106     return (None, 400)
107
108   if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
109     return (None, 400)
110
111   if (policy_type_id not in policy_instances.keys()):
112     policy_instances[policy_type_id]={}
113
114   policy_types[policy_type_id]=data
115
116   return (None, 201)
117
118
119 # API Function: Get all policy ids for a type
120 def get_all_policy_identities(policy_type_id):
121
122   extract_host_name(hosts_set, request)
123
124   if ((r := check_modified_response()) is not None):
125     return r
126
127   policy_type_id=str(policy_type_id)
128
129   if (policy_type_id not in policy_instances.keys()):
130     return (None, 404)
131   return (list(policy_instances[policy_type_id].keys()), 200)
132
133 # API Function: Get a policy instance
134 def get_policy_instance(policy_type_id, policy_instance_id):
135
136   extract_host_name(hosts_set, request)
137
138   if ((r := check_modified_response()) is not None):
139     return r
140
141   policy_type_id=str(policy_type_id)
142
143   if (policy_type_id not in policy_instances.keys()):
144     return (None, 404)
145
146   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
147     return (None, 404)
148
149   return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype='application/json')
150
151 # API function: Delete a policy
152 def delete_policy_instance(policy_type_id, policy_instance_id):
153
154   extract_host_name(hosts_set, request)
155
156   if ((r := check_modified_response()) is not None):
157     return r
158
159   policy_type_id=str(policy_type_id)
160
161   if (policy_type_id not in policy_instances.keys()):
162     return (None, 404)
163
164   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
165     return (None, 404)
166
167   fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
168   del policy_fingerprint[fpPrevious]
169   del policy_instances[policy_type_id][policy_instance_id]
170   del policy_status[policy_instance_id]
171
172   return (None, 202)
173
174 # API function: Create/update a policy
175 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
176
177   extract_host_name(hosts_set, request)
178
179   if ((r := check_modified_response()) is not None):
180     return r
181
182   policy_type_id=str(policy_type_id)
183
184   if (policy_type_id not in policy_instances.keys()):
185     return (None, 404)
186
187   try:
188     data = request.data
189     data = json.loads(data)
190   except:
191     return (None, 400)
192
193   try:
194     validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
195   except:
196     return (None, 400)
197
198   fpPrevious=None
199   retcode=201
200   if policy_instance_id in policy_instances[policy_type_id].keys():
201     retcode=200
202     fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
203   else:
204     if (policy_instance_id in policy_fingerprint.values()):
205       return (None, 400)
206
207   fp=calcFingerprint(data)
208   if (fp in policy_fingerprint.keys()):
209     id=policy_fingerprint[fp]
210     if (id != policy_instance_id):
211       return (None, 400)
212
213   if (fpPrevious is not None):
214     del policy_fingerprint[fpPrevious]
215
216   policy_fingerprint[fp]=policy_instance_id
217
218   policy_instances[policy_type_id][policy_instance_id]=data
219   ps={}
220   ps["instance_status"] = "NOT IN EFFECT"
221   ps["has_been_deleted"] = "false"
222   ps["created_at"] = str(datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
223   policy_status[policy_instance_id]=ps
224
225   return (None, 202)
226
227 # API function: Get policy status
228 def get_policy_instance_status(policy_type_id, policy_instance_id):
229
230   extract_host_name(hosts_set, request)
231
232   if ((r := check_modified_response()) is not None):
233     return r
234
235   policy_type_id=str(policy_type_id)
236   if (policy_type_id not in policy_instances.keys()):
237     return (None, 404)
238
239   if (policy_instance_id not in policy_instances[policy_type_id].keys()):
240     return (None, 404)
241
242   return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype='application/json')
243
244 # Helper: Create a response object if forced http response code is set
245 def get_forced_response():
246
247   response_code=forced_settings['code']
248   if (response_code is not None):
249     forced_settings['code'] = None
250     return (None, response_code)
251   return None
252
253 # Helper: Delay if delayed response code is set
254 def do_delay():
255
256   if (forced_settings['delay'] is not None):
257     try:
258       val=int(forced_settings['delay'])
259       time.sleep(val)
260     except:
261       return
262   return
263
264 # Helper: Check if response shall be delayed or a forced response shall be sent
265 def check_modified_response():
266   do_delay()
267   return get_forced_response()