1 # ============LICENSE_START===============================================
2 # Copyright (C) 2021-2023 Nordix Foundation. All rights reserved.
3 # Copyright (C) 2023-2024 OpenInfra Foundation Europe. All Rights Reserved
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 # This test case tests the STD_2.0.0 version of the simulator.
24 import multiprocessing
25 from unittest_setup import SERVER_URL, HOST_IP, PORT_NUMBER, setup_env, get_testdata_dir, client
26 from unittest_setup import run_flask_app
28 # Setup env and import paths
30 # Version of simulator
31 INTERFACE_VERSION="STD_2.0.0"
33 setup_env(INTERFACE_VERSION)
34 from compare_json import compare
36 from models.enforceStatus import EnforceStatus
38 def test_enforce_reason(client):
40 Test that we can set a valid enforce status and reason, and that we reject invalid cases.
42 enforceStatus = EnforceStatus()
44 enforceStatus.enforce_status = 'NOT_ENFORCED'
45 enforceStatus.enforce_reason = 'SCOPE_NOT_APPLICABLE'
46 enforce_dict = enforceStatus.to_dict()
47 assert enforce_dict['enforceStatus'] == 'NOT_ENFORCED'
48 assert enforce_dict['enforceReason'] == 'SCOPE_NOT_APPLICABLE'
50 enforceStatus.enforce_status = 'ENFORCED'
51 enforceStatus.enforce_reason = 'STATEMENT_NOT_APPLICABLE'
52 enforce_dict = enforceStatus.to_dict()
53 assert enforce_dict['enforceStatus'] == 'ENFORCED'
54 assert enforce_dict['enforceReason'] == 'STATEMENT_NOT_APPLICABLE'
56 enforceStatus.enforce_reason = 'OTHER_REASON'
57 enforce_dict = enforceStatus.to_dict()
58 assert enforce_dict['enforceReason'] == 'OTHER_REASON'
60 enforce_status = enforceStatus.enforce_status
61 assert str(enforce_status) == 'ENFORCED'
63 enforce_reason = enforceStatus.enforce_reason
64 assert str(enforce_reason) == 'OTHER_REASON'
66 with pytest.raises(ValueError):
67 enforceStatus.enforce_status = 'ERROR'
69 with pytest.raises(ValueError):
70 enforceStatus.enforce_reason = 'ERROR'
73 def test_apis(client):
75 testdata=get_testdata_dir()
77 # Header for json payload
79 "Content-Type" : "application/json"
82 # Simulator hello world
83 response=client.get(SERVER_URL)
84 assert response.status_code == 200
86 # Check used and implemented interfaces
87 response=client.get(SERVER_URL+'container_interfaces')
88 assert response.status_code == 200
89 assert response.data == b"Current interface: STD_2.0.0 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3', 'STD_2.0.0']"
91 # Reset simulator instances
92 response=client.post(SERVER_URL+'deleteinstances')
93 assert response.status_code == 200
95 # Reset simulator, all
96 response=client.post(SERVER_URL+'deleteall')
97 assert response.status_code == 200
99 # Get counter: interface
100 response=client.get(SERVER_URL+'counter/interface')
101 assert response.status_code == 200
102 assert response.data == b"STD_2.0.0"
104 # Get counter: remote hosts
105 response=client.get(SERVER_URL+'counter/remote_hosts')
106 assert response.status_code == 200
108 # Get counter: intstance
109 response=client.get(SERVER_URL+'counter/num_instances')
110 assert response.status_code == 200
111 assert response.data == b"0"
114 response=client.get(SERVER_URL+'counter/num_types')
115 assert response.status_code == 200
116 assert response.data == b"0"
118 # API: Get policy type, shall be empty
120 response=client.get(SERVER_URL+'A1-P/v2/policytypes')
121 assert response.status_code == 200
122 result=json.loads(response.data)
123 res=compare(data_response, result)
126 # API: Get policy instances for type 1, type not found
127 data_response = {"title": "The policy type does not exist.", "status": 404, "instance": "1"}
128 response=client.get(SERVER_URL+'A1-P/v2/policytypes/1/policies')
129 assert response.status_code == 404
130 result=json.loads(response.data)
131 res=compare(data_response, result)
134 # API: Get policy instances, type not found
135 data_response = {"title": "The policy type does not exist.", "status": 404, "instance": "test"}
136 response=client.get(SERVER_URL+'A1-P/v2/policytypes/test/policies')
137 assert response.status_code == 404
138 result=json.loads(response.data)
139 res=compare(data_response, result)
142 # Put a policy type: STD_1
143 with open(testdata+'std_1.json') as json_file:
144 data_response = b"Policy type STD_1 is OK."
145 json_payload=json.load(json_file)
146 response=client.put(SERVER_URL+'policytype?id=STD_1', headers=header, data=json.dumps(json_payload))
147 assert response.status_code == 201
148 assert data_response == response.data
150 # Put a policy type: STD_1, again
151 with open(testdata+'std_1.json') as json_file:
152 data_response = b"Policy type STD_1 is OK."
153 json_payload=json.load(json_file)
154 response=client.put(SERVER_URL+'policytype?id=STD_1', headers=header, data=json.dumps(json_payload))
155 assert response.status_code == 200
156 assert data_response == response.data
158 # API: Get policy type ids, shall contain type STD_1
159 data_response = [ "STD_1" ]
160 response=client.get(SERVER_URL+'A1-P/v2/policytypes')
161 assert response.status_code == 200
162 result=json.loads(response.data)
163 res=compare(data_response, result)
166 # Delete a policy type: STD_1
168 response=client.delete(SERVER_URL+'policytype?id=STD_1')
169 assert response.status_code == 204
170 assert data_response == response.data
172 # API: Get policy type ids, shall be empty
174 response=client.get(SERVER_URL+'A1-P/v2/policytypes')
175 assert response.status_code == 200
176 result=json.loads(response.data)
177 res=compare(data_response, result)
180 # Put a policy type: STD_1
181 with open(testdata+'std_1.json') as json_file:
182 data_response = b"Policy type STD_1 is OK."
183 json_payload=json.load(json_file)
184 response=client.put(SERVER_URL+'policytype?id=STD_1', headers=header, data=json.dumps(json_payload))
185 assert response.status_code == 201
186 assert data_response == response.data
188 # API: Get policy type ids, shall contain type STD_1
189 data_response = [ "STD_1" ]
190 response=client.get(SERVER_URL+'A1-P/v2/policytypes')
191 assert response.status_code == 200
192 result=json.loads(response.data)
193 res=compare(data_response, result)
196 # Get counter: types (shall be 1)
197 response=client.get(SERVER_URL+'counter/num_types')
198 assert response.status_code == 200
199 assert response.data == b"1"
201 # API: Get policy type: STD_1
202 with open(testdata+'std_1.json') as json_file:
203 data_response = json.load(json_file)
204 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1')
205 assert response.status_code == 200
206 result=json.loads(response.data)
207 res=compare(data_response, result)
210 # API: API: Get policy instances, shall be empty
212 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies')
213 assert response.status_code == 200
214 result=json.loads(response.data)
215 res=compare(data_response, result)
218 # API: Create policy instance pi1 of type: STD_1
219 with open(testdata+'pi1.json') as json_file:
220 json_payload=json.load(json_file)
221 response=client.put(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1', headers=header, data=json.dumps(json_payload))
222 assert response.status_code == 201
223 result=json.loads(response.data)
224 res=compare(json_payload, result)
227 # API: API: Get policy instance pi1 of type: STD_1
228 with open(testdata+'pi1.json') as json_file:
229 data_response = json.load(json_file)
230 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1')
231 assert response.status_code == 200
232 result=json.loads(response.data)
233 res=compare(data_response, result)
236 # API: Update policy instance pi1 of type: STD_1
237 with open(testdata+'pi1.json') as json_file:
238 json_payload=json.load(json_file)
239 response=client.put(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1', headers=header, data=json.dumps(json_payload))
240 assert response.status_code == 200
241 result=json.loads(response.data)
242 res=compare(json_payload, result)
245 # API: Update policy instance pi1 of type: STD_1
246 with open(testdata+'pi1_updated.json') as json_file:
247 json_payload=json.load(json_file)
248 response=client.put(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1', headers=header, data=json.dumps(json_payload))
249 assert response.status_code == 200
250 result=json.loads(response.data)
251 res=compare(json_payload, result)
254 # # API: Duplicate policy instance pi2 of type: STD_1 - and delete it
256 with open(testdata+'pi1_updated.json') as json_file:
257 json_payload=json.load(json_file)
258 response=client.put(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2', headers=header, data=json.dumps(json_payload))
259 assert response.status_code == 201
260 result=json.loads(response.data)
261 res=compare(json_payload, result)
264 response=client.delete(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2')
265 assert response.status_code == 204
266 assert data_response == response.data
269 # API: Get policy instances, shall contain pi1
270 data_response = ["pi1"]
271 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies')
272 assert response.status_code == 200
273 result=json.loads(response.data)
274 res=compare(data_response, result)
277 # Get counter: intstance
278 response=client.get(SERVER_URL+'counter/num_instances')
279 assert response.status_code == 200
280 assert response.data == b"1"
283 response=client.get(SERVER_URL+'counter/num_types')
284 assert response.status_code == 200
285 assert response.data == b"1"
287 # Set force response code 409. ==="
288 response=client.post(SERVER_URL+'forceresponse?code=409')
289 assert response.status_code == 200
291 # API: Get policy instances, shall fail
292 data_response = {"title" : "Conflict", "status" : 409, "detail" : "Request could not be processed in the current state of the resource"}
293 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies')
294 assert response.status_code == 409
295 result=json.loads(response.data)
296 res=compare(data_response, result)
299 # API: API: Get policy status
300 data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "OTHER_REASON"}
301 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1/status')
302 assert response.status_code == 200
303 result=json.loads(response.data)
304 res=compare(data_response, result)
307 # API: Create policy instance pi2 of type: STD_1
308 with open(testdata+'pi2.json') as json_file:
309 json_payload=json.load(json_file)
310 response=client.put(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2', headers=header, data=json.dumps(json_payload))
311 assert response.status_code == 201
312 result=json.loads(response.data)
313 res=compare(json_payload, result)
316 # API: Update policy instance pi2 of type: STD_1
317 with open(testdata+'pi2.json') as json_file:
318 json_payload=json.load(json_file)
319 response=client.put(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2', headers=header, data=json.dumps(json_payload))
320 assert response.status_code == 200
321 result=json.loads(response.data)
322 res=compare(json_payload, result)
325 # API: Get policy instances, shall contain pi1 and pi2
326 data_response = ["pi1","pi2"]
327 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies')
328 assert response.status_code == 200
329 result=json.loads(response.data)
330 res=compare(data_response, result)
333 # Get counter: intstance
334 response=client.get(SERVER_URL+'counter/num_instances')
335 assert response.status_code == 200
336 assert response.data == b"2"
339 response=client.get(SERVER_URL+'counter/num_types')
340 assert response.status_code == 200
341 assert response.data == b"1"
344 response=client.post(SERVER_URL+'forcedelay?delay=10')
345 assert response.status_code == 200
346 assert response.data == b"Force delay: 10 sec set for all A1 responses"
351 # API: Get policy instances, shall contain pi1 and pi2 and delayed 10 sec
352 data_response = ["pi1","pi2"]
353 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies')
354 assert response.status_code == 200
355 result=json.loads(response.data)
356 res=compare(data_response, result)
361 assert (end-start) > 9
364 response=client.post(SERVER_URL+'forcedelay')
365 assert response.status_code == 200
366 assert response.data == b"Force delay: None sec set for all A1 responses"
368 # API: API: Get policy instance pi1 of type: STD_1
369 with open(testdata+'pi1_updated.json') as json_file:
370 data_response = json.load(json_file)
371 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1')
372 assert response.status_code == 200
373 result=json.loads(response.data)
374 res=compare(data_response, result)
377 # API: API: Get policy instance pi2 of type: STD_1
378 with open(testdata+'pi2.json') as json_file:
379 data_response = json.load(json_file)
380 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2')
381 assert response.status_code == 200
382 result=json.loads(response.data)
383 res=compare(data_response, result)
386 # API: DELETE policy instance pi1
388 response=client.delete(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1')
389 assert response.status_code == 204
390 assert data_response == response.data
392 # API: Get policy instances, shall contain pi2
393 data_response = ["pi2"]
394 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies')
395 assert response.status_code == 200
396 result=json.loads(response.data)
397 res=compare(data_response, result)
400 # API: API: Get policy status
401 data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "OTHER_REASON"}
402 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2/status')
403 assert response.status_code == 200
404 result=json.loads(response.data)
405 res=compare(data_response, result)
408 # Set status for policy instance pi2
409 response=client.put(SERVER_URL+'status?policyid=pi2&status=NOT_ENFORCED&reason=STATEMENT_NOT_APPLICABLE')
410 assert response.status_code == 200
412 # API: API: Get policy status
413 data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "STATEMENT_NOT_APPLICABLE"}
414 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2/status')
415 assert response.status_code == 200
416 result=json.loads(response.data)
417 res=compare(data_response, result)
421 # Set status for policy instance pi2
422 response=client.put(SERVER_URL+'status?policyid=pi2&status=NOT_ENFORCED&reason=OTHER_REASON')
423 assert response.status_code == 200
425 # API: API: Get policy status
426 data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "OTHER_REASON"}
427 response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2/status')
428 assert response.status_code == 200
429 result=json.loads(response.data)
430 res=compare(data_response, result)
433 # Get counter: data_delivery
434 response=client.get(SERVER_URL+'counter/datadelivery')
435 assert response.status_code == 200
436 assert response.data == b"0"
440 response=client.post(SERVER_URL+'datadelivery', headers=header, data=json.dumps(json_payload))
441 assert response.status_code == 200
443 # Get counter: data_delivery
444 response=client.get(SERVER_URL+'counter/datadelivery')
445 assert response.status_code == 200
446 assert response.data == b"1"
448 # Get counter: interface
449 response=client.get(SERVER_URL+'counter/interface')
450 assert response.status_code == 200
451 assert response.data == b"STD_2.0.0"
453 # Get counter: remote hosts
454 response=client.get(SERVER_URL+'counter/remote_hosts')
455 assert response.status_code == 200
457 # Get counter: intstance
458 response=client.get(SERVER_URL+'counter/num_instances')
459 assert response.status_code == 200
460 assert response.data == b"1"
463 response=client.get(SERVER_URL+'counter/num_types')
464 assert response.status_code == 200
465 assert response.data == b"1"
467 def test_notificationDestination(client):
468 test_data = get_testdata_dir() + 'pi2.json'
469 # Header for json payload
470 header = { "Content-Type" : "application/json" }
472 # === API: Update policy instance pi2 of type: 2 ==="
473 with open(test_data) as json_file:
474 payload = json.load(json_file)
475 response = client.put(SERVER_URL+"A1-P/v2/policytypes/STD_1/policies/pi2?notificationDestination=http://localhost:8086/statustest", headers=header, data=json.dumps(payload))
476 assert response.status_code == 200
477 result = json.loads(response.data)
478 assert compare(payload, result) == True
480 def test_sendstatus(client):
481 # Create a new thread to run the Flask app in parallel on a different port so that we can call the callback.
482 proc = multiprocessing.Process(target=run_flask_app, args=())
485 test_data = get_testdata_dir() + 'pi2.json'
486 header = { "Content-Type" : "application/json" }
488 # Timeout can be removed with polling the endpoints if required
489 proc.join(timeout=10)
491 # === Send status for pi2===
492 with open(test_data) as json_file:
493 payload = json.load(json_file)
494 response = client.post(SERVER_URL+'sendstatus?policyid=pi2', headers=header, data=json.dumps(payload))
496 assert response.status_code == 204
497 result = response.data
500 # Send status, negative test with missing parameter
501 response = client.post(SERVER_URL+'sendstatus', headers=header, data="")
502 assert response.status_code == 400
504 # Send status pi9, negative test for policy id not found
505 response = client.post(SERVER_URL+'sendstatus?policyid=pi9', headers=header, data="")
506 assert response.status_code == 404