1 # ============LICENSE_START===============================================
2 # Copyright (C) 2021 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
18 # This test case test the OSC_2.1.0 version of the simulator
23 INTERFACE_VERSION="OSC_2.1.0"
25 from unittest_setup import SERVER_URL, setup_env, get_testdata_dir, client
27 #Setup env and import paths
28 setup_env(INTERFACE_VERSION)
30 from compare_json import compare
32 def test_apis(client):
34 testdata=get_testdata_dir()
36 # Simulator hello world
37 response=client.get(SERVER_URL)
38 assert response.status_code == 200
40 # Check used and implemented interfaces
41 response=client.get(SERVER_URL+'container_interfaces')
42 assert response.status_code == 200
43 assert response.data == b"Current interface: OSC_2.1.0 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3', 'STD_2.0.0']"
45 # Reset simulator instances
46 response=client.post(SERVER_URL+'deleteinstances')
47 assert response.status_code == 200
49 # Reset simulator, all
50 response=client.post(SERVER_URL+'deleteall')
51 assert response.status_code == 200
54 response=client.get(SERVER_URL+'a1-p/healthcheck')
55 assert response.status_code == 200
57 # API: Get policy types, shall be empty
58 data_policytypes_get = [ ]
59 response=client.get(SERVER_URL+'a1-p/policytypes')
60 assert response.status_code == 200
61 result=json.loads(response.data)
62 res=compare(data_policytypes_get, result)
65 # API: Delete a policy type, shall fail
66 response=client.delete(SERVER_URL+'a1-p/policytypes/1')
67 assert response.status_code == 404
69 # API: Get policy instances for type 1, shall fail
70 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
71 assert response.status_code == 404
73 # Header for json payload
75 "Content-Type" : "application/json"
78 # API: Put a policy type: 1
79 with open(testdata+'pt1.json') as json_file:
80 policytype_1 = json.load(json_file)
81 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
82 assert response.status_code == 201
84 # API: Put a policy type: 1 again
85 with open(testdata+'pt1.json') as json_file:
86 policytype_1 = json.load(json_file)
87 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
88 assert response.status_code == 201
90 # API: Delete a policy type
91 response=client.delete(SERVER_URL+'a1-p/policytypes/1')
92 assert response.status_code == 204
94 # API: Get policy type ids, shall be empty
95 data_policytypes_get = [ ]
96 response=client.get(SERVER_URL+'a1-p/policytypes')
97 assert response.status_code == 200
98 result=json.loads(response.data)
99 res=compare(data_policytypes_get, result)
102 # API: Put a policy type: 1
103 with open(testdata+'pt1.json') as json_file:
104 policytype_1 = json.load(json_file)
105 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
106 assert response.status_code == 201
108 # API: Get policy type ids, shall contain '1'
109 data_policytypes_get = [ 1 ]
110 response=client.get(SERVER_URL+'a1-p/policytypes')
111 assert response.status_code == 200
112 result=json.loads(response.data)
113 res=compare(data_policytypes_get, result)
116 # API: Get instances for type 1, shall be empty
117 data_policies_get = [ ]
118 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
119 assert response.status_code == 200
120 result=json.loads(response.data)
121 res=compare(data_policies_get, result)
124 # API: Create policy instance pi1 of type: 1
125 with open(testdata+'pi1.json') as json_file:
126 policy_1 = json.load(json_file)
127 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1))
128 assert response.status_code == 202
130 # API: Get policy instance pi1 of type: 1
131 with open(testdata+'pi1.json') as json_file:
132 policy_1 = json.load(json_file)
133 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
134 assert response.status_code == 200
135 result=json.loads(response.data)
136 res=compare(policy_1, result)
139 # API: Update policy instance pi1 of type: 1
140 with open(testdata+'pi1.json') as json_file:
141 policy_1 = json.load(json_file)
142 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1))
143 assert response.status_code == 202
145 # API: Update policy type: 1, shall fail
146 with open(testdata+'pt1.json') as json_file:
147 policytype_1 = json.load(json_file)
148 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
149 assert response.status_code == 400
151 # API: Get instances for type 1, shall contain 'pi1'
152 data_policies_get = [ "pi1" ]
153 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
154 assert response.status_code == 200
155 result=json.loads(response.data)
156 res=compare(data_policies_get, result)
159 # API: Create policy instance pi2 (copy of pi1) of type: 1.
160 with open(testdata+'pi1.json') as json_file:
161 policy_2 = json.load(json_file)
162 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi2', headers=header, data=json.dumps(policy_2))
163 assert response.status_code == 202
165 # API: DELETE policy instance pi1
166 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi2')
167 assert response.status_code == 202
169 # Set force response code 401
170 response=client.post(SERVER_URL+'forceresponse?code=401')
171 assert response.status_code == 200
173 # API: Get policy type 1. Shall fail with forced code
174 response=client.get(SERVER_URL+'a1-p/policytypes/1')
175 assert response.status_code == 401
177 # API: Get policy status
179 "instance_status" : "NOT IN EFFECT",
180 "has_been_deleted" : "false",
181 "created_at" : "????"
183 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
184 assert response.status_code == 200
185 result=json.loads(response.data)
186 res=compare(policy_status, result)
189 # Load a policy type: 2
190 with open(testdata+'pt2.json') as json_file:
191 policytype_2 = json.load(json_file)
192 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
193 assert response.status_code == 201
194 assert response.data == b"Policy type 2 is OK."
196 # Load a policy type: 2, again
197 with open(testdata+'pt2.json') as json_file:
198 policytype_2 = json.load(json_file)
199 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
200 assert response.status_code == 200
201 assert response.data == b"Policy type 2 is OK."
203 # API: Get policy type ids, shall contain '1' and '2'
204 data_policytypes_get = [ 1,2 ]
205 response=client.get(SERVER_URL+'a1-p/policytypes')
206 assert response.status_code == 200
207 result=json.loads(response.data)
208 res=compare(data_policytypes_get, result)
211 # Get policy type ids, shall contain type 1 and 2 =="
212 data_policytypes_get = [ "1","2" ]
213 response=client.get(SERVER_URL+'policytypes')
214 assert response.status_code == 200
215 result=json.loads(response.data)
216 res=compare(data_policytypes_get, result)
219 # API: Get policy type 2
220 with open(testdata+'pt2.json') as json_file:
221 policytype_2 = json.load(json_file)
222 response=client.get(SERVER_URL+'a1-p/policytypes/2')
223 assert response.status_code == 200
224 result=json.loads(response.data)
225 res=compare(policytype_2, result)
228 # Delete a policy type
229 response=client.delete(SERVER_URL+'policytype?id=2')
230 assert response.status_code == 204
232 # API: Get policy type ids, shall contain '1'
233 data_policytypes_get = [ 1]
234 response=client.get(SERVER_URL+'a1-p/policytypes')
235 assert response.status_code == 200
236 result=json.loads(response.data)
237 res=compare(data_policytypes_get, result)
240 # Load a policy type: 2
241 with open(testdata+'pt2.json') as json_file:
242 policytype_2 = json.load(json_file)
243 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
244 assert response.status_code == 201
245 assert response.data == b"Policy type 2 is OK."
247 # API: Get policy type 2
248 with open(testdata+'pt2.json') as json_file:
249 policytype_2 = json.load(json_file)
250 response=client.get(SERVER_URL+'a1-p/policytypes/2')
251 assert response.status_code == 200
252 result=json.loads(response.data)
253 res=compare(policytype_2, result)
256 # API: Get instances for type 2, shall be empty
257 data_policies_get = [ ]
258 response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
259 assert response.status_code == 200
260 result=json.loads(response.data)
261 res=compare(data_policies_get, result)
264 # API: Create policy instance pi1 of type: 2, shall fail
265 with open(testdata+'pi1.json') as json_file:
266 policy_1 = json.load(json_file)
267 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_1))
268 assert response.status_code == 400
270 # API: Create policy instance pi2 of type: 2. Missing param, shall fail
271 with open(testdata+'pi2_missing_param.json') as json_file:
272 policy_2 = json.load(json_file)
273 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
274 assert response.status_code == 400
276 # API: Create policy instance pi2 of type: 2
277 with open(testdata+'pi2.json') as json_file:
278 policy_2 = json.load(json_file)
279 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
280 assert response.status_code == 202
282 with open(testdata+'pi2.json') as json_file:
283 policy_2 = json.load(json_file)
284 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
285 assert response.status_code == 202
287 # API: Get instances for type 1, shall contain pi1
288 data_policies_get = [ "pi1" ]
289 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
290 assert response.status_code == 200
291 result=json.loads(response.data)
292 res=compare(data_policies_get, result)
295 # API: Get instances for type 2, shall contain pi2
296 data_policies_get = ["pi2" ]
297 response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
298 assert response.status_code == 200
299 result=json.loads(response.data)
300 res=compare(data_policies_get, result)
303 # Set force response code 409. ==="
304 response=client.post(SERVER_URL+'forceresponse?code=401')
305 assert response.status_code == 200
307 # API: Get policy status for pi1, shall fail
308 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
309 assert response.status_code == 401
312 response=client.post(SERVER_URL+'forcedelay?delay=10')
313 assert response.status_code == 200
314 assert response.data == b"Force delay: 10 sec set for all A1 responses"
316 # API: Get policy status for pi1. Shall delay 10 sec
318 "instance_status" : "NOT IN EFFECT",
319 "has_been_deleted" : "false",
320 "created_at" : "????"
322 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
323 assert response.status_code == 200
324 result=json.loads(response.data)
325 res=compare(policy_status, result)
329 response=client.post(SERVER_URL+'forcedelay')
330 assert response.status_code == 200
331 assert response.data == b"Force delay: None sec set for all A1 responses"
334 response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT')
335 assert response.status_code == 200
337 # API: Get policy status for pi1
339 "instance_status" : "IN EFFECT",
340 "has_been_deleted" : "false",
341 "created_at" : "????"
343 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
344 assert response.status_code == 200
345 result=json.loads(response.data)
346 res=compare(policy_status, result)
350 response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT&deleted=true&created_at=2020-03-30%2012:00:00')
351 assert response.status_code == 200
353 # API: Get policy status for pi1
355 "instance_status" : "IN EFFECT",
356 "has_been_deleted" : "true",
357 "created_at" : "????"
359 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
360 assert response.status_code == 200
361 result=json.loads(response.data)
362 res=compare(policy_status, result)
365 # Get counter: intstance
366 response=client.get(SERVER_URL+'counter/num_instances')
367 assert response.status_code == 200
368 assert response.data == b"2"
370 # Get counter: types (shall be 2)
371 response=client.get(SERVER_URL+'counter/num_types')
372 assert response.status_code == 200
373 assert response.data == b"2"
375 # Get counter: interface
376 response=client.get(SERVER_URL+'counter/interface')
377 assert response.status_code == 200
378 assert response.data == b"OSC_2.1.0"
380 # Get counter: remote hosts
381 response=client.get(SERVER_URL+'counter/remote_hosts')
382 assert response.status_code == 200
384 # Get counter: test, shall fail
385 response=client.get(SERVER_URL+'counter/test')
386 assert response.status_code == 404
388 # API: DELETE policy instance pi1
389 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
390 assert response.status_code == 202
392 # API: Get instances for type 1, shall be empty
393 data_policies_get = [ ]
394 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
395 assert response.status_code == 200
396 result=json.loads(response.data)
397 res=compare(data_policies_get, result)
400 # API: Get instances for type 2, shall contain pi2
401 data_policies_get = ["pi2" ]
402 response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
403 assert response.status_code == 200
404 result=json.loads(response.data)
405 res=compare(data_policies_get, result)
408 # Get counter: instances
409 response=client.get(SERVER_URL+'counter/num_instances')
410 assert response.status_code == 200
411 assert response.data == b"1"
414 ### Tests to increase code coverage
416 # Set force response code 500
417 response=client.post(SERVER_URL+'forceresponse?code=500')
418 assert response.status_code == 200
421 response=client.get(SERVER_URL+'a1-p/healthcheck')
422 assert response.status_code == 500
424 # Set force response code 501
425 response=client.post(SERVER_URL+'forceresponse?code=501')
426 assert response.status_code == 200
428 # API: Get policy types
429 data_policytypes_get = [ ]
430 response=client.get(SERVER_URL+'a1-p/policytypes')
431 assert response.status_code == 501
433 # Set force response code 502
434 response=client.post(SERVER_URL+'forceresponse?code=502')
435 assert response.status_code == 200
437 # API: Delete a policy type, shall fail
438 response=client.delete(SERVER_URL+'a1-p/policytypes/55')
439 assert response.status_code == 502
441 # Set force response code 503. ==="
442 response=client.post(SERVER_URL+'forceresponse?code=503')
443 assert response.status_code == 200
445 with open(testdata+'pi1.json') as json_file:
446 policy_1 = json.load(json_file)
447 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1))
448 assert response.status_code == 503
450 # Set force response code 504
451 response=client.post(SERVER_URL+'forceresponse?code=504')
452 assert response.status_code == 200
454 # API: Get instances for type 1, shall fail
455 data_policies_get = [ ]
456 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
457 assert response.status_code == 504
459 # Set force response code 505. ==="
460 response=client.post(SERVER_URL+'forceresponse?code=505')
461 assert response.status_code == 200
463 # API: delete instance pi1, shall fail
464 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
465 assert response.status_code == 505
467 # API: Delete a policy type having instances, shall fail
468 response=client.delete(SERVER_URL+'a1-p/policytypes/2')
469 assert response.status_code == 400
471 # API: delete instance pi1 in type 5, shall fail
472 response=client.delete(SERVER_URL+'a1-p/policytypes/5/policies/pi1')
473 assert response.status_code == 404
475 # API: delete instance pi99 in type 1, shall fail
476 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi99')
477 assert response.status_code == 404
479 # API: Create policy instance pi80 of type: 5
480 with open(testdata+'pi1.json') as json_file:
481 policy_80 = json.load(json_file)
482 response=client.put(SERVER_URL+'a1-p/policytypes/5/policies/pi80', headers=header, data=json.dumps(policy_80))
483 assert response.status_code == 404
485 # API: Get policy type
486 data_policytypes_get = [ ]
487 response=client.get(SERVER_URL+'a1-p/policytypes/55')
488 assert response.status_code == 404
490 # API: Get status, bad type - shall fail
491 response=client.get(SERVER_URL+'a1-p/policytypes/99/policies/pi1/status')
492 assert response.status_code == 404
494 # API: Get status, bad instance - shall fail
495 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi111/status')
496 assert response.status_code == 404
498 # Load policy type, no type in url - shall faill
499 with open(testdata+'pt2.json') as json_file:
500 policytype_2 = json.load(json_file)
501 response=client.put(SERVER_URL+'policytype', headers=header, data=json.dumps(policytype_2))
502 assert response.status_code == 400
504 # Load policy type - duplicatee - shall faill
505 with open(testdata+'pt1.json') as json_file:
506 policytype_1 = json.load(json_file)
507 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_1))
508 assert response.status_code == 400