1 # ============LICENSE_START===============================================
2 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
18 # This test case test the OSC_2.1.0 version of the simulator
26 #Constants for the test case
27 INTERFACE_VERSION="OSC_2.1.0"
30 SERVER_URL="http://"+HOST_IP+":"+PORT_NUMBER+"/"
33 # Env TESTS_BASE_PATH is set when executed via tox.ini
34 # If basic test is executed from cmd line, that env var is not needed
35 if 'TESTS_BASE_PATH' in os.environ:
36 cwd=os.environ['TESTS_BASE_PATH']+"/"+INTERFACE_VERSION+"/"
37 TESTDATA=cwd+"/../../test/"+INTERFACE_VERSION+"/jsonfiles/"
39 #Env var to setup version and host logging
40 os.environ['APIPATH'] = cwd+"/../../api/"+INTERFACE_VERSION
41 os.environ['REMOTE_HOSTS_LOGGING'] = "ON"
43 # Paths need to run the sim, including needed source file dirs
44 sys.path.append(os.path.abspath(cwd+'../../src/common'))
45 sys.path.append(os.path.abspath(cwd+'../../test/common'))
46 sys.path.append(os.path.abspath(cwd+'../../src/'+INTERFACE_VERSION))
47 os.chdir(cwd+"../../src/"+INTERFACE_VERSION)
51 from compare_json import compare
55 with app.app.test_client() as c:
58 def test_apis(client):
60 # Header for json payload
62 "Content-Type" : "application/json"
65 # Simulator hello world
66 response=client.get(SERVER_URL)
67 assert response.status_code == 200
70 # Check used and implemented interfaces
71 response=client.get(SERVER_URL+'container_interfaces')
72 assert response.status_code == 200
73 assert response.data == b"Current interface: OSC_2.1.0 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']"
75 # Reset simulator instances
76 response=client.post(SERVER_URL+'deleteinstances')
77 assert response.status_code == 200
79 # Reset simulator, all
80 response=client.post(SERVER_URL+'deleteall')
81 assert response.status_code == 200
84 response=client.get(SERVER_URL+'a1-p/healthcheck')
85 assert response.status_code == 200
87 # API: Get policy types, shall be empty
88 data_policytypes_get = [ ]
89 response=client.get(SERVER_URL+'a1-p/policytypes')
90 assert response.status_code == 200
91 result=json.loads(response.data)
92 res=compare(data_policytypes_get, result)
95 # API: Delete a policy type, shall fail
96 response=client.delete(SERVER_URL+'a1-p/policytypes/1')
97 assert response.status_code == 404
99 # API: Get policy instances for type 1, shall fail
100 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
101 assert response.status_code == 404
103 # API: Put a policy type: 1
104 with open(TESTDATA+'pt1.json') as json_file:
105 policytype_1 = json.load(json_file)
106 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
107 assert response.status_code == 201
109 # API: Put a policy type: 1 again
110 with open(TESTDATA+'pt1.json') as json_file:
111 policytype_1 = json.load(json_file)
112 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
113 assert response.status_code == 201
115 # API: Delete a policy type
116 response=client.delete(SERVER_URL+'a1-p/policytypes/1')
117 assert response.status_code == 204
119 # API: Get policy type ids, shall be empty
120 data_policytypes_get = [ ]
121 response=client.get(SERVER_URL+'a1-p/policytypes')
122 assert response.status_code == 200
123 result=json.loads(response.data)
124 res=compare(data_policytypes_get, result)
127 # API: Put a policy type: 1
128 with open(TESTDATA+'pt1.json') as json_file:
129 policytype_1 = json.load(json_file)
130 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
131 assert response.status_code == 201
133 # API: Get policy type ids, shall contain '1'
134 data_policytypes_get = [ 1 ]
135 response=client.get(SERVER_URL+'a1-p/policytypes')
136 assert response.status_code == 200
137 result=json.loads(response.data)
138 res=compare(data_policytypes_get, result)
141 # API: Get instances for type 1, shall be empty
142 data_policies_get = [ ]
143 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
144 assert response.status_code == 200
145 result=json.loads(response.data)
146 res=compare(data_policies_get, result)
149 # API: Create policy instance pi1 of type: 1
150 with open(TESTDATA+'pi1.json') as json_file:
151 policy_1 = json.load(json_file)
152 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1))
153 assert response.status_code == 202
155 # API: Get policy instance pi1 of type: 1
156 with open(TESTDATA+'pi1.json') as json_file:
157 policy_1 = json.load(json_file)
158 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
159 assert response.status_code == 200
160 result=json.loads(response.data)
161 res=compare(policy_1, result)
164 # API: Update policy instance pi1 of type: 1
165 with open(TESTDATA+'pi1.json') as json_file:
166 policy_1 = json.load(json_file)
167 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1))
168 assert response.status_code == 202
170 # API: Update policy type: 1, shall fail
171 with open(TESTDATA+'pt1.json') as json_file:
172 policytype_1 = json.load(json_file)
173 response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1))
174 assert response.status_code == 400
176 # API: Get instances for type 1, shall contain 'pi1'
177 data_policies_get = [ "pi1" ]
178 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
179 assert response.status_code == 200
180 result=json.loads(response.data)
181 res=compare(data_policies_get, result)
184 # API: Create policy instance pi2 (copy of pi1) of type: 1. Shall fail
185 with open(TESTDATA+'pi1.json') as json_file:
186 policy_2 = json.load(json_file)
187 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi2', headers=header, data=json.dumps(policy_2))
188 assert response.status_code == 400
190 # Set force response code 401
191 response=client.post(SERVER_URL+'forceresponse?code=401')
192 assert response.status_code == 200
194 # API: Get policy type 1. Shall fail with forced code
195 response=client.get(SERVER_URL+'a1-p/policytypes/1')
196 assert response.status_code == 401
198 # API: Get policy status
200 "instance_status" : "NOT IN EFFECT",
201 "has_been_deleted" : "false",
202 "created_at" : "????"
204 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
205 assert response.status_code == 200
206 result=json.loads(response.data)
207 res=compare(policy_status, result)
210 # Load a policy type: 2
211 with open(TESTDATA+'pt2.json') as json_file:
212 policytype_2 = json.load(json_file)
213 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
214 assert response.status_code == 201
215 assert response.data == b"Policy type 2 is OK."
217 # Load a policy type: 2, again
218 with open(TESTDATA+'pt2.json') as json_file:
219 policytype_2 = json.load(json_file)
220 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
221 assert response.status_code == 200
222 assert response.data == b"Policy type 2 is OK."
224 # API: Get policy type ids, shall contain '1' and '2'
225 data_policytypes_get = [ 1,2 ]
226 response=client.get(SERVER_URL+'a1-p/policytypes')
227 assert response.status_code == 200
228 result=json.loads(response.data)
229 res=compare(data_policytypes_get, result)
232 # Get policy type ids, shall contain type 1 and 2 =="
233 data_policytypes_get = [ "1","2" ]
234 response=client.get(SERVER_URL+'policytypes')
235 assert response.status_code == 200
236 result=json.loads(response.data)
237 res=compare(data_policytypes_get, result)
240 # API: Get policy type 2
241 with open(TESTDATA+'pt2.json') as json_file:
242 policytype_2 = json.load(json_file)
243 response=client.get(SERVER_URL+'a1-p/policytypes/2')
244 assert response.status_code == 200
245 result=json.loads(response.data)
246 res=compare(policytype_2, result)
249 # Delete a policy type
250 response=client.delete(SERVER_URL+'policytype?id=2')
251 assert response.status_code == 204
253 # API: Get policy type ids, shall contain '1'
254 data_policytypes_get = [ 1]
255 response=client.get(SERVER_URL+'a1-p/policytypes')
256 assert response.status_code == 200
257 result=json.loads(response.data)
258 res=compare(data_policytypes_get, result)
261 # Load a policy type: 2
262 with open(TESTDATA+'pt2.json') as json_file:
263 policytype_2 = json.load(json_file)
264 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2))
265 assert response.status_code == 201
266 assert response.data == b"Policy type 2 is OK."
268 # API: Get policy type 2
269 with open(TESTDATA+'pt2.json') as json_file:
270 policytype_2 = json.load(json_file)
271 response=client.get(SERVER_URL+'a1-p/policytypes/2')
272 assert response.status_code == 200
273 result=json.loads(response.data)
274 res=compare(policytype_2, result)
277 # API: Get instances for type 2, shall be empty
278 data_policies_get = [ ]
279 response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
280 assert response.status_code == 200
281 result=json.loads(response.data)
282 res=compare(data_policies_get, result)
285 # API: Create policy instance pi1 of type: 2, shall fail
286 with open(TESTDATA+'pi1.json') as json_file:
287 policy_1 = json.load(json_file)
288 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_1))
289 assert response.status_code == 400
291 # API: Create policy instance pi2 of type: 2. Missing param, shall fail
292 with open(TESTDATA+'pi2_missing_param.json') as json_file:
293 policy_2 = json.load(json_file)
294 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_2))
295 assert response.status_code == 400
297 # API: Create policy instance pi2 of type: 2
298 with open(TESTDATA+'pi2.json') as json_file:
299 policy_2 = json.load(json_file)
300 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
301 assert response.status_code == 202
303 with open(TESTDATA+'pi2.json') as json_file:
304 policy_2 = json.load(json_file)
305 response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2))
306 assert response.status_code == 202
308 # API: Get instances for type 1, shall contain pi1
309 data_policies_get = [ "pi1" ]
310 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
311 assert response.status_code == 200
312 result=json.loads(response.data)
313 res=compare(data_policies_get, result)
316 # API: Get instances for type 2, shall contain pi2
317 data_policies_get = ["pi2" ]
318 response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
319 assert response.status_code == 200
320 result=json.loads(response.data)
321 res=compare(data_policies_get, result)
324 # API: Create policy instance pi11 (copy of pi1) of type: 1. Shall fail
325 with open(TESTDATA+'pi1.json') as json_file:
326 policy_1 = json.load(json_file)
327 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1))
328 assert response.status_code == 400
330 # Set force response code 409. ==="
331 response=client.post(SERVER_URL+'forceresponse?code=401')
332 assert response.status_code == 200
334 # API: Get policy status for pi1, shall fail
335 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
336 assert response.status_code == 401
339 response=client.post(SERVER_URL+'forcedelay?delay=10')
340 assert response.status_code == 200
341 assert response.data == b"Force delay: 10 sec set for all A1 responses"
343 # API: Get policy status for pi1. Shall delay 10 sec
345 "instance_status" : "NOT IN EFFECT",
346 "has_been_deleted" : "false",
347 "created_at" : "????"
349 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
350 assert response.status_code == 200
351 result=json.loads(response.data)
352 res=compare(policy_status, result)
356 response=client.post(SERVER_URL+'forcedelay')
357 assert response.status_code == 200
358 assert response.data == b"Force delay: None sec set for all A1 responses"
361 response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT')
362 assert response.status_code == 200
364 # API: Get policy status for pi1
366 "instance_status" : "IN EFFECT",
367 "has_been_deleted" : "false",
368 "created_at" : "????"
370 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
371 assert response.status_code == 200
372 result=json.loads(response.data)
373 res=compare(policy_status, result)
377 response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT&deleted=true&created_at=2020-03-30%2012:00:00')
378 assert response.status_code == 200
380 # API: Get policy status for pi1
382 "instance_status" : "IN EFFECT",
383 "has_been_deleted" : "true",
384 "created_at" : "????"
386 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status')
387 assert response.status_code == 200
388 result=json.loads(response.data)
389 res=compare(policy_status, result)
392 # Get counter: intstance
393 response=client.get(SERVER_URL+'counter/num_instances')
394 assert response.status_code == 200
395 assert response.data == b"2"
397 # Get counter: types (shall be 2)
398 response=client.get(SERVER_URL+'counter/num_types')
399 assert response.status_code == 200
400 assert response.data == b"2"
402 # Get counter: interface
403 response=client.get(SERVER_URL+'counter/interface')
404 assert response.status_code == 200
405 assert response.data == b"OSC_2.1.0"
407 # Get counter: remote hosts
408 response=client.get(SERVER_URL+'counter/remote_hosts')
409 assert response.status_code == 200
411 # Get counter: test, shall fail
412 response=client.get(SERVER_URL+'counter/test')
413 assert response.status_code == 404
415 # API: DELETE policy instance pi1
416 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
417 assert response.status_code == 202
419 # API: Get instances for type 1, shall be empty
420 data_policies_get = [ ]
421 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
422 assert response.status_code == 200
423 result=json.loads(response.data)
424 res=compare(data_policies_get, result)
427 # API: Get instances for type 2, shall contain pi2
428 data_policies_get = ["pi2" ]
429 response=client.get(SERVER_URL+'a1-p/policytypes/2/policies')
430 assert response.status_code == 200
431 result=json.loads(response.data)
432 res=compare(data_policies_get, result)
435 # Get counter: instances
436 response=client.get(SERVER_URL+'counter/num_instances')
437 assert response.status_code == 200
438 assert response.data == b"1"
441 ### Tests to increase code coverage
443 # Set force response code 500
444 response=client.post(SERVER_URL+'forceresponse?code=500')
445 assert response.status_code == 200
448 response=client.get(SERVER_URL+'a1-p/healthcheck')
449 assert response.status_code == 500
451 # Set force response code 501
452 response=client.post(SERVER_URL+'forceresponse?code=501')
453 assert response.status_code == 200
455 # API: Get policy types
456 data_policytypes_get = [ ]
457 response=client.get(SERVER_URL+'a1-p/policytypes')
458 assert response.status_code == 501
460 # Set force response code 502
461 response=client.post(SERVER_URL+'forceresponse?code=502')
462 assert response.status_code == 200
464 # API: Delete a policy type, shall fail
465 response=client.delete(SERVER_URL+'a1-p/policytypes/55')
466 assert response.status_code == 502
468 # Set force response code 503. ==="
469 response=client.post(SERVER_URL+'forceresponse?code=503')
470 assert response.status_code == 200
472 with open(TESTDATA+'pi1.json') as json_file:
473 policy_1 = json.load(json_file)
474 response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1))
475 assert response.status_code == 503
477 # Set force response code 504
478 response=client.post(SERVER_URL+'forceresponse?code=504')
479 assert response.status_code == 200
481 # API: Get instances for type 1, shall fail
482 data_policies_get = [ ]
483 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies')
484 assert response.status_code == 504
486 # Set force response code 505. ==="
487 response=client.post(SERVER_URL+'forceresponse?code=505')
488 assert response.status_code == 200
490 # API: delete instance pi1, shall fail
491 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1')
492 assert response.status_code == 505
494 # API: Delete a policy type having instances, shall fail
495 response=client.delete(SERVER_URL+'a1-p/policytypes/2')
496 assert response.status_code == 400
498 # API: delete instance pi1 in type 5, shall fail
499 response=client.delete(SERVER_URL+'a1-p/policytypes/5/policies/pi1')
500 assert response.status_code == 404
502 # API: delete instance pi99 in type 1, shall fail
503 response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi99')
504 assert response.status_code == 404
506 # API: Create policy instance pi80 of type: 5
507 with open(TESTDATA+'pi1.json') as json_file:
508 policy_80 = json.load(json_file)
509 response=client.put(SERVER_URL+'a1-p/policytypes/5/policies/pi80', headers=header, data=json.dumps(policy_80))
510 assert response.status_code == 404
512 # API: Get policy type
513 data_policytypes_get = [ ]
514 response=client.get(SERVER_URL+'a1-p/policytypes/55')
515 assert response.status_code == 404
517 # API: Get status, bad type - shall fail
518 response=client.get(SERVER_URL+'a1-p/policytypes/99/policies/pi1/status')
519 assert response.status_code == 404
521 # API: Get status, bad instance - shall fail
522 response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi111/status')
523 assert response.status_code == 404
525 # Load policy type, no type in url - shall faill
526 with open(TESTDATA+'pt2.json') as json_file:
527 policytype_2 = json.load(json_file)
528 response=client.put(SERVER_URL+'policytype', headers=header, data=json.dumps(policytype_2))
529 assert response.status_code == 400
531 # Load policy type - duplicatee - shall faill
532 with open(TESTDATA+'pt1.json') as json_file:
533 policytype_1 = json.load(json_file)
534 response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_1))
535 assert response.status_code == 400