31e70fdb5f976bd76ae9673090bc1728d94da884
[sim/a1-interface.git] / near-rt-ric-simulator / tests / test_osc_2_1_0.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 # This test case test the OSC_2.1.0 version of the simulator
19
20 import json
21
22 #Version of simulator
23 INTERFACE_VERSION="OSC_2.1.0"
24
25 from unittest_setup import SERVER_URL, setup_env, get_testdata_dir, client
26
27 #Setup env and import paths
28 setup_env(INTERFACE_VERSION)
29
30 from compare_json import compare
31
32 def test_apis(client):
33
34     testdata=get_testdata_dir()
35
36     # Simulator hello world
37     response=client.get(SERVER_URL)
38     assert response.status_code == 200
39
40     # Check used and implemented interfaces
41     response=client.get(SERVER_URL+'container_interfaces')
42     assert response.status_code == 200
43     assert response.data ==  b"Current interface: OSC_2.1.0  All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']"
44
45     # Reset simulator instances
46     response=client.post(SERVER_URL+'deleteinstances')
47     assert response.status_code == 200
48
49     # Reset simulator, all
50     response=client.post(SERVER_URL+'deleteall')
51     assert response.status_code == 200
52
53     # API: Healthcheck
54     response=client.get(SERVER_URL+'a1-p/healthcheck')
55     assert response.status_code == 200
56
57     # API: Get policy types, shall be empty
58     data_policytypes_get = [ ]
59     response=client.get(SERVER_URL+'a1-p/policytypes')
60     assert response.status_code == 200
61     result=json.loads(response.data)
62     res=compare(data_policytypes_get, result)
63     assert res == True
64
65     # API: Delete a policy type, shall fail
66     response=client.delete(SERVER_URL+'a1-p/policytypes/1')
67     assert response.status_code == 404
68
69     # API: Get policy instances for type 1, shall fail
70     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
71     assert response.status_code == 404
72
73     # Header for json payload
74     header = {
75         "Content-Type" : "application/json"
76     }
77
78     # API: Put a policy type: 1
79     with open(testdata+'pt1.json') as json_file:
80         policytype_1 = json.load(json_file)
81         response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
82         assert response.status_code == 201
83
84     # API: Put a policy type: 1 again
85     with open(testdata+'pt1.json') as json_file:
86         policytype_1 = json.load(json_file)
87         response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
88         assert response.status_code == 201
89
90     # API: Delete a policy type
91     response=client.delete(SERVER_URL+'a1-p/policytypes/1')
92     assert response.status_code == 204
93
94     # API: Get policy type ids, shall be empty
95     data_policytypes_get = [ ]
96     response=client.get(SERVER_URL+'a1-p/policytypes')
97     assert response.status_code == 200
98     result=json.loads(response.data)
99     res=compare(data_policytypes_get, result)
100     assert res == True
101
102     # API: Put a policy type: 1
103     with open(testdata+'pt1.json') as json_file:
104         policytype_1 = json.load(json_file)
105         response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
106         assert response.status_code == 201
107
108     # API: Get policy type ids, shall contain '1'
109     data_policytypes_get = [ 1 ]
110     response=client.get(SERVER_URL+'a1-p/policytypes')
111     assert response.status_code == 200
112     result=json.loads(response.data)
113     res=compare(data_policytypes_get, result)
114     assert res == True
115
116     # API: Get instances for type 1, shall be empty
117     data_policies_get = [ ]
118     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
119     assert response.status_code == 200
120     result=json.loads(response.data)
121     res=compare(data_policies_get, result)
122     assert res == True
123
124     # API: Create policy instance pi1 of type: 1
125     with open(testdata+'pi1.json') as json_file:
126         policy_1 = json.load(json_file)
127         response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1))
128         assert response.status_code == 202
129
130     # API: Get policy instance pi1 of type: 1
131     with open(testdata+'pi1.json') as json_file:
132         policy_1 = json.load(json_file)
133         response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
134         assert response.status_code == 200
135         result=json.loads(response.data)
136         res=compare(policy_1, result)
137         assert res == True
138
139     # API: Update policy instance pi1 of type: 1
140     with open(testdata+'pi1.json') as json_file:
141         policy_1 = json.load(json_file)
142         response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1))
143         assert response.status_code == 202
144
145     # API: Update policy type: 1, shall fail
146     with open(testdata+'pt1.json') as json_file:
147         policytype_1 = json.load(json_file)
148         response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
149         assert response.status_code == 400
150
151     # API: Get instances for type 1, shall contain 'pi1'
152     data_policies_get = [ "pi1" ]
153     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
154     assert response.status_code == 200
155     result=json.loads(response.data)
156     res=compare(data_policies_get, result)
157     assert res == True
158
159     # API: Create policy instance pi2 (copy of pi1) of type: 1. Shall fail
160     with open(testdata+'pi1.json') as json_file:
161         policy_2 = json.load(json_file)
162         response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi2', headers=header, data=json.dumps(policy_2))
163         assert response.status_code == 400
164
165     # Set force response code 401
166     response=client.post(SERVER_URL+'forceresponse?code=401')
167     assert response.status_code == 200
168
169     # API: Get policy type 1. Shall fail with forced code
170     response=client.get(SERVER_URL+'a1-p/policytypes/1')
171     assert response.status_code == 401
172
173     # API: Get policy status
174     policy_status = {
175         "instance_status" : "NOT IN EFFECT",
176         "has_been_deleted" : "false",
177         "created_at" : "????"
178     }
179     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
180     assert response.status_code == 200
181     result=json.loads(response.data)
182     res=compare(policy_status, result)
183     assert res == True
184
185     # Load a policy type: 2
186     with open(testdata+'pt2.json') as json_file:
187         policytype_2 = json.load(json_file)
188         response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
189         assert response.status_code == 201
190         assert response.data == b"Policy type 2 is OK."
191
192     # Load a policy type: 2, again
193     with open(testdata+'pt2.json') as json_file:
194         policytype_2 = json.load(json_file)
195         response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
196         assert response.status_code == 200
197         assert response.data == b"Policy type 2 is OK."
198
199     # API: Get policy type ids, shall contain '1' and '2'
200     data_policytypes_get = [ 1,2 ]
201     response=client.get(SERVER_URL+'a1-p/policytypes')
202     assert response.status_code == 200
203     result=json.loads(response.data)
204     res=compare(data_policytypes_get, result)
205     assert res == True
206
207     # Get policy type ids, shall contain type 1 and 2 =="
208     data_policytypes_get = [ "1","2" ]
209     response=client.get(SERVER_URL+'policytypes')
210     assert response.status_code == 200
211     result=json.loads(response.data)
212     res=compare(data_policytypes_get, result)
213     assert res == True
214
215     # API: Get policy type 2
216     with open(testdata+'pt2.json') as json_file:
217         policytype_2 = json.load(json_file)
218         response=client.get(SERVER_URL+'a1-p/policytypes/2')
219         assert response.status_code == 200
220         result=json.loads(response.data)
221         res=compare(policytype_2, result)
222         assert res == True
223
224     # Delete a policy type
225     response=client.delete(SERVER_URL+'policytype?id=2')
226     assert response.status_code == 204
227
228     # API: Get policy type ids, shall contain '1'
229     data_policytypes_get = [ 1]
230     response=client.get(SERVER_URL+'a1-p/policytypes')
231     assert response.status_code == 200
232     result=json.loads(response.data)
233     res=compare(data_policytypes_get, result)
234     assert res == True
235
236     # Load a policy type: 2
237     with open(testdata+'pt2.json') as json_file:
238         policytype_2 = json.load(json_file)
239         response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
240         assert response.status_code == 201
241         assert response.data == b"Policy type 2 is OK."
242
243     # API: Get policy type 2
244     with open(testdata+'pt2.json') as json_file:
245         policytype_2 = json.load(json_file)
246         response=client.get(SERVER_URL+'a1-p/policytypes/2')
247         assert response.status_code == 200
248         result=json.loads(response.data)
249         res=compare(policytype_2, result)
250         assert res == True
251
252     # API: Get instances for type 2, shall be empty
253     data_policies_get = [ ]
254     response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
255     assert response.status_code == 200
256     result=json.loads(response.data)
257     res=compare(data_policies_get, result)
258     assert res == True
259
260     # API: Create policy instance pi1 of type: 2, shall fail
261     with open(testdata+'pi1.json') as json_file:
262         policy_1 = json.load(json_file)
263         response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_1))
264         assert response.status_code == 400
265
266     # API: Create policy instance pi2 of type: 2. Missing param, shall fail
267     with open(testdata+'pi2_missing_param.json') as json_file:
268         policy_2 = json.load(json_file)
269         response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_2))
270         assert response.status_code == 400
271
272     # API: Create policy instance pi2 of type: 2
273     with open(testdata+'pi2.json') as json_file:
274         policy_2 = json.load(json_file)
275         response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
276         assert response.status_code == 202
277
278     with open(testdata+'pi2.json') as json_file:
279         policy_2 = json.load(json_file)
280         response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
281         assert response.status_code == 202
282
283     # API: Get instances for type 1, shall contain pi1
284     data_policies_get = [ "pi1" ]
285     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
286     assert response.status_code == 200
287     result=json.loads(response.data)
288     res=compare(data_policies_get, result)
289     assert res == True
290
291     # API: Get instances for type 2, shall contain pi2
292     data_policies_get = ["pi2" ]
293     response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
294     assert response.status_code == 200
295     result=json.loads(response.data)
296     res=compare(data_policies_get, result)
297     assert res == True
298
299     # API: Create policy instance pi11 (copy of pi1) of type: 1. Shall fail
300     with open(testdata+'pi1.json') as json_file:
301         policy_1 = json.load(json_file)
302         response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1))
303         assert response.status_code == 400
304
305     # Set force response code 409. ==="
306     response=client.post(SERVER_URL+'forceresponse?code=401')
307     assert response.status_code == 200
308
309     # API: Get policy status for pi1, shall fail
310     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
311     assert response.status_code == 401
312
313     # Set force delay 10
314     response=client.post(SERVER_URL+'forcedelay?delay=10')
315     assert response.status_code == 200
316     assert response.data ==  b"Force delay: 10 sec set for all A1 responses"
317
318     # API: Get policy status for pi1. Shall delay 10 sec
319     policy_status = {
320         "instance_status" : "NOT IN EFFECT",
321         "has_been_deleted" : "false",
322         "created_at" : "????"
323     }
324     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
325     assert response.status_code == 200
326     result=json.loads(response.data)
327     res=compare(policy_status, result)
328     assert res == True
329
330     # Reset force delay
331     response=client.post(SERVER_URL+'forcedelay')
332     assert response.status_code == 200
333     assert response.data ==  b"Force delay: None sec set for all A1 responses"
334
335     #  Set status for pi1
336     response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT')
337     assert response.status_code == 200
338
339     # API: Get policy status for pi1
340     policy_status = {
341         "instance_status" : "IN EFFECT",
342         "has_been_deleted" : "false",
343         "created_at" : "????"
344     }
345     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
346     assert response.status_code == 200
347     result=json.loads(response.data)
348     res=compare(policy_status, result)
349     assert res == True
350
351     #  Set status for pi1
352     response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT&deleted=true&created_at=2020-03-30%2012:00:00')
353     assert response.status_code == 200
354
355     # API: Get policy status for pi1
356     policy_status = {
357         "instance_status" : "IN EFFECT",
358         "has_been_deleted" : "true",
359         "created_at" : "????"
360     }
361     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
362     assert response.status_code == 200
363     result=json.loads(response.data)
364     res=compare(policy_status, result)
365     assert res == True
366
367     # Get counter: intstance
368     response=client.get(SERVER_URL+'counter/num_instances')
369     assert response.status_code == 200
370     assert response.data ==  b"2"
371
372     # Get counter: types (shall be 2)
373     response=client.get(SERVER_URL+'counter/num_types')
374     assert response.status_code == 200
375     assert response.data ==  b"2"
376
377     # Get counter: interface
378     response=client.get(SERVER_URL+'counter/interface')
379     assert response.status_code == 200
380     assert response.data == b"OSC_2.1.0"
381
382     # Get counter: remote hosts
383     response=client.get(SERVER_URL+'counter/remote_hosts')
384     assert response.status_code == 200
385
386     # Get counter: test, shall fail
387     response=client.get(SERVER_URL+'counter/test')
388     assert response.status_code == 404
389
390     # API: DELETE policy instance pi1
391     response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
392     assert response.status_code == 202
393
394     # API: Get instances for type 1, shall be empty
395     data_policies_get = [ ]
396     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
397     assert response.status_code == 200
398     result=json.loads(response.data)
399     res=compare(data_policies_get, result)
400     assert res == True
401
402     # API: Get instances for type 2, shall contain pi2
403     data_policies_get = ["pi2" ]
404     response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
405     assert response.status_code == 200
406     result=json.loads(response.data)
407     res=compare(data_policies_get, result)
408     assert res == True
409
410     # Get counter: instances
411     response=client.get(SERVER_URL+'counter/num_instances')
412     assert response.status_code == 200
413     assert response.data ==  b"1"
414
415
416     ### Tests to increase code coverage
417
418     # Set force response code 500
419     response=client.post(SERVER_URL+'forceresponse?code=500')
420     assert response.status_code == 200
421
422     # API: Healthcheck
423     response=client.get(SERVER_URL+'a1-p/healthcheck')
424     assert response.status_code == 500
425
426     # Set force response code 501
427     response=client.post(SERVER_URL+'forceresponse?code=501')
428     assert response.status_code == 200
429
430     # API: Get policy types
431     data_policytypes_get = [ ]
432     response=client.get(SERVER_URL+'a1-p/policytypes')
433     assert response.status_code == 501
434
435     # Set force response code 502
436     response=client.post(SERVER_URL+'forceresponse?code=502')
437     assert response.status_code == 200
438
439     # API: Delete a policy type, shall fail
440     response=client.delete(SERVER_URL+'a1-p/policytypes/55')
441     assert response.status_code == 502
442
443     # Set force response code 503. ==="
444     response=client.post(SERVER_URL+'forceresponse?code=503')
445     assert response.status_code == 200
446
447     with open(testdata+'pi1.json') as json_file:
448         policy_1 = json.load(json_file)
449         response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1))
450         assert response.status_code == 503
451
452     # Set force response code 504
453     response=client.post(SERVER_URL+'forceresponse?code=504')
454     assert response.status_code == 200
455
456     # API: Get instances for type 1, shall fail
457     data_policies_get = [ ]
458     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
459     assert response.status_code == 504
460
461     # Set force response code 505. ==="
462     response=client.post(SERVER_URL+'forceresponse?code=505')
463     assert response.status_code == 200
464
465     # API: delete instance pi1, shall fail
466     response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
467     assert response.status_code == 505
468
469     # API: Delete a policy type having instances, shall fail
470     response=client.delete(SERVER_URL+'a1-p/policytypes/2')
471     assert response.status_code == 400
472
473     # API: delete instance pi1 in type 5, shall fail
474     response=client.delete(SERVER_URL+'a1-p/policytypes/5/policies/pi1')
475     assert response.status_code == 404
476
477     # API: delete instance pi99 in type 1, shall fail
478     response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi99')
479     assert response.status_code == 404
480
481     # API: Create policy instance pi80 of type: 5
482     with open(testdata+'pi1.json') as json_file:
483         policy_80 = json.load(json_file)
484         response=client.put(SERVER_URL+'a1-p/policytypes/5/policies/pi80', headers=header, data=json.dumps(policy_80))
485         assert response.status_code == 404
486
487     # API: Get policy type
488     data_policytypes_get = [ ]
489     response=client.get(SERVER_URL+'a1-p/policytypes/55')
490     assert response.status_code == 404
491
492     # API: Get status, bad type - shall fail
493     response=client.get(SERVER_URL+'a1-p/policytypes/99/policies/pi1/status')
494     assert response.status_code == 404
495
496     # API: Get status, bad instance - shall fail
497     response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi111/status')
498     assert response.status_code == 404
499
500     # Load policy type, no type in url - shall faill
501     with open(testdata+'pt2.json') as json_file:
502         policytype_2 = json.load(json_file)
503         response=client.put(SERVER_URL+'policytype', headers=header, data=json.dumps(policytype_2))
504         assert response.status_code == 400
505
506     # Load policy type - duplicatee - shall faill
507     with open(testdata+'pt1.json') as json_file:
508         policytype_1 = json.load(json_file)
509         response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_1))
510         assert response.status_code == 400