From e9f743a0db7d3a024e868a671f95cbb96cd0632d Mon Sep 17 00:00:00 2001 From: BjornMagnussonXA Date: Tue, 22 Sep 2020 08:23:24 +0200 Subject: [PATCH] Another attempt to correct unit tests Using pytest and fixture - no need to start the http server Change-Id: Ic947d7d8c8b732e32946ed0ec6c93627f8a0a21c Issue-ID: NONRTRIC-279 Signed-off-by: BjornMagnussonXA Change-Id: Ib1da420d5e758fe86dff84ab66b4844237f1929a --- .../tests/OSC_2.1.0/test_osc_2_1_0.py | 981 ++++++++++----------- .../tests/STD_1.1.3/test_std_1_1_3.py | 873 +++++++++--------- tox.ini | 1 - 3 files changed, 917 insertions(+), 938 deletions(-) diff --git a/near-rt-ric-simulator/tests/OSC_2.1.0/test_osc_2_1_0.py b/near-rt-ric-simulator/tests/OSC_2.1.0/test_osc_2_1_0.py index 9b53a27..3b791af 100644 --- a/near-rt-ric-simulator/tests/OSC_2.1.0/test_osc_2_1_0.py +++ b/near-rt-ric-simulator/tests/OSC_2.1.0/test_osc_2_1_0.py @@ -17,11 +17,9 @@ # This test case test the OSC_2.1.0 version of the simulator -from unittest import TestCase +import pytest import sys import os -import threading -import requests import json @@ -49,498 +47,489 @@ sys.path.append(os.path.abspath(cwd+'../../src/'+INTERFACE_VERSION)) os.chdir(cwd+"../../src/"+INTERFACE_VERSION) import main +from main import app from compare_json import compare -class TestCase1(TestCase): - from main import app - server = None - - def startServer(self): - self.app.run(port=PORT_NUMBER, host=HOST_IP, threaded=True) - - def setUp(self): - self.server=threading.Thread(target=self.startServer, args=()) - self.server.daemon = True - self.server.start() - - # executed after each test - def tearDown(self): - self.server.killed=True - - def test_apis(self): - - # Header for json payload - header = { - "Content-Type" : "application/json" - } - - # Simulator hello world - response=requests.get(SERVER_URL) - self.assertEqual(response.status_code, 200) - - # Check used and implemented interfaces - response=requests.get(SERVER_URL+'container_interfaces') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Current interface: OSC_2.1.0 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']") - - # Reset simulator instances - response=requests.post(SERVER_URL+'deleteinstances') - self.assertEqual(response.status_code, 200) - - # Reset simulator, all - response=requests.post(SERVER_URL+'deleteall') - self.assertEqual(response.status_code, 200) - - # API: Healthcheck - response=requests.get(SERVER_URL+'a1-p/healthcheck') - self.assertEqual(response.status_code, 200) - - # API: Get policy types, shall be empty - data_policytypes_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policytypes_get, result) - self.assertEqual(res, True) - - # API: Delete a policy type, shall fail - response=requests.delete(SERVER_URL+'a1-p/policytypes/1') - self.assertEqual(response.status_code, 404) - - # API: Get policy instances for type 1, shall fail - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies') - self.assertEqual(response.status_code, 404) - - # API: Put a policy type: 1 - with open(TESTDATA+'pt1.json') as json_file: - policytype_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) - self.assertEqual(response.status_code, 201) - - # API: Put a policy type: 1 again - with open(TESTDATA+'pt1.json') as json_file: - policytype_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) - self.assertEqual(response.status_code, 201) - - # API: Delete a policy type - response=requests.delete(SERVER_URL+'a1-p/policytypes/1') - self.assertEqual(response.status_code, 204) - - # API: Get policy type ids, shall be empty - data_policytypes_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policytypes_get, result) - self.assertEqual(res, True) - - # API: Put a policy type: 1 - with open(TESTDATA+'pt1.json') as json_file: - policytype_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) - self.assertEqual(response.status_code, 201) - - # API: Get policy type ids, shall contain '1' - data_policytypes_get = [ 1 ] - response=requests.get(SERVER_URL+'a1-p/policytypes') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policytypes_get, result) - self.assertEqual(res, True) - - # API: Get instances for type 1, shall be empty - data_policies_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # API: Create policy instance pi1 of type: 1 - with open(TESTDATA+'pi1.json') as json_file: - policy_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1)) - self.assertEqual(response.status_code, 202) - - # API: Get policy instance pi1 of type: 1 - with open(TESTDATA+'pi1.json') as json_file: - policy_1 = json.load(json_file) - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policy_1, result) - self.assertEqual(res, True) - - # API: Update policy instance pi1 of type: 1 - with open(TESTDATA+'pi1.json') as json_file: - policy_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1)) - self.assertEqual(response.status_code, 202) - - # API: Update policy type: 1, shall fail - with open(TESTDATA+'pt1.json') as json_file: - policytype_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) - self.assertEqual(response.status_code, 400) - - # API: Get instances for type 1, shall contain 'pi1' - data_policies_get = [ "pi1" ] - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # API: Create policy instance pi2 (copy of pi1) of type: 1. Shall fail - with open(TESTDATA+'pi1.json') as json_file: - policy_2 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1/policies/pi2', headers=header, data=json.dumps(policy_2)) - self.assertEqual(response.status_code, 400) - - # Set force response code 401 - response=requests.post(SERVER_URL+'forceresponse?code=401') - self.assertEqual(response.status_code, 200) - - # API: Get policy type 1. Shall fail with forced code - response=requests.get(SERVER_URL+'a1-p/policytypes/1') - self.assertEqual(response.status_code, 401) - - # API: Get policy status - policy_status = { - "instance_status" : "NOT IN EFFECT", - "has_been_deleted" : "false", - "created_at" : "????" - } - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policy_status, result) - self.assertEqual(res, True) - - # Load a policy type: 2 - with open(TESTDATA+'pt2.json') as json_file: - policytype_2 = json.load(json_file) - response=requests.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2)) - self.assertEqual(response.status_code, 201) - self.assertEqual(response.text, "Policy type 2 is OK.") - - # Load a policy type: 2, again - with open(TESTDATA+'pt2.json') as json_file: - policytype_2 = json.load(json_file) - response=requests.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2)) - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Policy type 2 is OK.") - - # API: Get policy type ids, shall contain '1' and '2' - data_policytypes_get = [ 1,2 ] - response=requests.get(SERVER_URL+'a1-p/policytypes') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policytypes_get, result) - self.assertEqual(res, True) - - # Get policy type ids, shall contain type 1 and 2 ==" - data_policytypes_get = [ "1","2" ] - response=requests.get(SERVER_URL+'policytypes') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policytypes_get, result) - self.assertEqual(res, True) - - # API: Get policy type 2 - with open(TESTDATA+'pt2.json') as json_file: - policytype_2 = json.load(json_file) - response=requests.get(SERVER_URL+'a1-p/policytypes/2') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policytype_2, result) - self.assertEqual(res, True) - - # Delete a policy type - response=requests.delete(SERVER_URL+'policytype?id=2') - self.assertEqual(response.status_code, 204) - - # API: Get policy type ids, shall contain '1' - data_policytypes_get = [ 1] - response=requests.get(SERVER_URL+'a1-p/policytypes') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policytypes_get, result) - self.assertEqual(res, True) - - # Load a policy type: 2 - with open(TESTDATA+'pt2.json') as json_file: - policytype_2 = json.load(json_file) - response=requests.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2)) - self.assertEqual(response.status_code, 201) - self.assertEqual(response.text, "Policy type 2 is OK.") - - # API: Get policy type 2 - with open(TESTDATA+'pt2.json') as json_file: - policytype_2 = json.load(json_file) - response=requests.get(SERVER_URL+'a1-p/policytypes/2') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policytype_2, result) - self.assertEqual(res, True) - - # API: Get instances for type 2, shall be empty - data_policies_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes/2/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # API: Create policy instance pi1 of type: 2, shall fail - with open(TESTDATA+'pi1.json') as json_file: - policy_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_1)) - self.assertEqual(response.status_code, 400) - - # API: Create policy instance pi2 of type: 2. Missing param, shall fail - with open(TESTDATA+'pi2_missing_param.json') as json_file: - policy_2 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_2)) - self.assertEqual(response.status_code, 400) - - # API: Create policy instance pi2 of type: 2 - with open(TESTDATA+'pi2.json') as json_file: - policy_2 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2)) - self.assertEqual(response.status_code, 202) - - with open(TESTDATA+'pi2.json') as json_file: - policy_2 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2)) - self.assertEqual(response.status_code, 202) - - # API: Get instances for type 1, shall contain pi1 - data_policies_get = [ "pi1" ] - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # API: Get instances for type 2, shall contain pi2 - data_policies_get = ["pi2" ] - response=requests.get(SERVER_URL+'a1-p/policytypes/2/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # API: Create policy instance pi11 (copy of pi1) of type: 1. Shall fail - with open(TESTDATA+'pi1.json') as json_file: - policy_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1)) - self.assertEqual(response.status_code, 400) - - # Set force response code 409. ===" - response=requests.post(SERVER_URL+'forceresponse?code=401') - self.assertEqual(response.status_code, 200) - - # API: Get policy status for pi1, shall fail - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') - self.assertEqual(response.status_code, 401) - - # Set force delay 10 - response=requests.post(SERVER_URL+'forcedelay?delay=10') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Force delay: 10 sec set for all A1 responses") - - # API: Get policy status for pi1. Shall delay 10 sec - policy_status = { - "instance_status" : "NOT IN EFFECT", - "has_been_deleted" : "false", - "created_at" : "????" - } - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policy_status, result) - self.assertEqual(res, True) - - # Reset force delay - response=requests.post(SERVER_URL+'forcedelay') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Force delay: None sec set for all A1 responses") - - # Set status for pi1 - response=requests.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT') - self.assertEqual(response.status_code, 200) - - # API: Get policy status for pi1 - policy_status = { - "instance_status" : "IN EFFECT", - "has_been_deleted" : "false", - "created_at" : "????" - } - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policy_status, result) - self.assertEqual(res, True) - - # Set status for pi1 - response=requests.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT&deleted=true&created_at=2020-03-30%2012:00:00') - self.assertEqual(response.status_code, 200) - - # API: Get policy status for pi1 - policy_status = { - "instance_status" : "IN EFFECT", - "has_been_deleted" : "true", - "created_at" : "????" - } - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(policy_status, result) - self.assertEqual(res, True) - - # Get counter: intstance - response=requests.get(SERVER_URL+'counter/num_instances') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "2") - - # Get counter: types (shall be 2) - response=requests.get(SERVER_URL+'counter/num_types') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "2") - - # Get counter: interface - response=requests.get(SERVER_URL+'counter/interface') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "OSC_2.1.0") - - # Get counter: remote hosts - response=requests.get(SERVER_URL+'counter/remote_hosts') - self.assertEqual(response.status_code, 200) - - # Get counter: test, shall fail - response=requests.get(SERVER_URL+'counter/test') - self.assertEqual(response.status_code, 404) - - # API: DELETE policy instance pi1 - response=requests.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1') - self.assertEqual(response.status_code, 202) - - # API: Get instances for type 1, shall be empty - data_policies_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # API: Get instances for type 2, shall contain pi2 - data_policies_get = ["pi2" ] - response=requests.get(SERVER_URL+'a1-p/policytypes/2/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policies_get, result) - self.assertEqual(res, True) - - # Get counter: instances - response=requests.get(SERVER_URL+'counter/num_instances') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "1") - - - ### Tests to increase code coverage - - # Set force response code 500 - response=requests.post(SERVER_URL+'forceresponse?code=500') - self.assertEqual(response.status_code, 200) - - # API: Healthcheck - response=requests.get(SERVER_URL+'a1-p/healthcheck') - self.assertEqual(response.status_code, 500) - - # Set force response code 501 - response=requests.post(SERVER_URL+'forceresponse?code=501') - self.assertEqual(response.status_code, 200) - - # API: Get policy types - data_policytypes_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes') - self.assertEqual(response.status_code, 501) - - # Set force response code 502 - response=requests.post(SERVER_URL+'forceresponse?code=502') - self.assertEqual(response.status_code, 200) - - # API: Delete a policy type, shall fail - response=requests.delete(SERVER_URL+'a1-p/policytypes/55') - self.assertEqual(response.status_code, 502) - - # Set force response code 503. ===" - response=requests.post(SERVER_URL+'forceresponse?code=503') - self.assertEqual(response.status_code, 200) - - with open(TESTDATA+'pi1.json') as json_file: - policy_1 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1)) - self.assertEqual(response.status_code, 503) - - # Set force response code 504 - response=requests.post(SERVER_URL+'forceresponse?code=504') - self.assertEqual(response.status_code, 200) - - # API: Get instances for type 1, shall fail - data_policies_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies') - self.assertEqual(response.status_code, 504) - - # Set force response code 505. ===" - response=requests.post(SERVER_URL+'forceresponse?code=505') - self.assertEqual(response.status_code, 200) - - # API: delete instance pi1, shall fail - response=requests.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1') - self.assertEqual(response.status_code, 505) - - # API: Delete a policy type having instances, shall fail - response=requests.delete(SERVER_URL+'a1-p/policytypes/2') - self.assertEqual(response.status_code, 400) - - # API: delete instance pi1 in type 5, shall fail - response=requests.delete(SERVER_URL+'a1-p/policytypes/5/policies/pi1') - self.assertEqual(response.status_code, 404) - - # API: delete instance pi99 in type 1, shall fail - response=requests.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi99') - self.assertEqual(response.status_code, 404) - - # API: Create policy instance pi80 of type: 5 - with open(TESTDATA+'pi1.json') as json_file: - policy_80 = json.load(json_file) - response=requests.put(SERVER_URL+'a1-p/policytypes/5/policies/pi80', headers=header, data=json.dumps(policy_80)) - self.assertEqual(response.status_code, 404) - - # API: Get policy type - data_policytypes_get = [ ] - response=requests.get(SERVER_URL+'a1-p/policytypes/55') - self.assertEqual(response.status_code, 404) - - # API: Get status, bad type - shall fail - response=requests.get(SERVER_URL+'a1-p/policytypes/99/policies/pi1/status') - self.assertEqual(response.status_code, 404) - - # API: Get status, bad instance - shall fail - response=requests.get(SERVER_URL+'a1-p/policytypes/1/policies/pi111/status') - self.assertEqual(response.status_code, 404) - - # Load policy type, no type in url - shall faill - with open(TESTDATA+'pt2.json') as json_file: - policytype_2 = json.load(json_file) - response=requests.put(SERVER_URL+'policytype', headers=header, data=json.dumps(policytype_2)) - self.assertEqual(response.status_code, 400) - - # Load policy type - duplicatee - shall faill - with open(TESTDATA+'pt1.json') as json_file: - policytype_1 = json.load(json_file) - response=requests.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_1)) - self.assertEqual(response.status_code, 400) +@pytest.fixture +def client(): + with app.app.test_client() as c: + yield c + +def test_apis(client): + + # Header for json payload + header = { + "Content-Type" : "application/json" + } + + # Simulator hello world + response=client.get(SERVER_URL) + assert response.status_code == 200 + + + # Check used and implemented interfaces + response=client.get(SERVER_URL+'container_interfaces') + assert response.status_code == 200 + assert response.data == b"Current interface: OSC_2.1.0 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']" + + # Reset simulator instances + response=client.post(SERVER_URL+'deleteinstances') + assert response.status_code == 200 + + # Reset simulator, all + response=client.post(SERVER_URL+'deleteall') + assert response.status_code == 200 + + # API: Healthcheck + response=client.get(SERVER_URL+'a1-p/healthcheck') + assert response.status_code == 200 + + # API: Get policy types, shall be empty + data_policytypes_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policytypes_get, result) + assert res == True + + # API: Delete a policy type, shall fail + response=client.delete(SERVER_URL+'a1-p/policytypes/1') + assert response.status_code == 404 + + # API: Get policy instances for type 1, shall fail + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies') + assert response.status_code == 404 + + # API: Put a policy type: 1 + with open(TESTDATA+'pt1.json') as json_file: + policytype_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) + assert response.status_code == 201 + + # API: Put a policy type: 1 again + with open(TESTDATA+'pt1.json') as json_file: + policytype_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) + assert response.status_code == 201 + + # API: Delete a policy type + response=client.delete(SERVER_URL+'a1-p/policytypes/1') + assert response.status_code == 204 + + # API: Get policy type ids, shall be empty + data_policytypes_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policytypes_get, result) + assert res == True + + # API: Put a policy type: 1 + with open(TESTDATA+'pt1.json') as json_file: + policytype_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) + assert response.status_code == 201 + + # API: Get policy type ids, shall contain '1' + data_policytypes_get = [ 1 ] + response=client.get(SERVER_URL+'a1-p/policytypes') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policytypes_get, result) + assert res == True + + # API: Get instances for type 1, shall be empty + data_policies_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # API: Create policy instance pi1 of type: 1 + with open(TESTDATA+'pi1.json') as json_file: + policy_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1)) + assert response.status_code == 202 + + # API: Get policy instance pi1 of type: 1 + with open(TESTDATA+'pi1.json') as json_file: + policy_1 = json.load(json_file) + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policy_1, result) + assert res == True + + # API: Update policy instance pi1 of type: 1 + with open(TESTDATA+'pi1.json') as json_file: + policy_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi1', headers=header, data=json.dumps(policy_1)) + assert response.status_code == 202 + + # API: Update policy type: 1, shall fail + with open(TESTDATA+'pt1.json') as json_file: + policytype_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1', headers=header, data=json.dumps(policytype_1)) + assert response.status_code == 400 + + # API: Get instances for type 1, shall contain 'pi1' + data_policies_get = [ "pi1" ] + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # API: Create policy instance pi2 (copy of pi1) of type: 1. Shall fail + with open(TESTDATA+'pi1.json') as json_file: + policy_2 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi2', headers=header, data=json.dumps(policy_2)) + assert response.status_code == 400 + + # Set force response code 401 + response=client.post(SERVER_URL+'forceresponse?code=401') + assert response.status_code == 200 + + # API: Get policy type 1. Shall fail with forced code + response=client.get(SERVER_URL+'a1-p/policytypes/1') + assert response.status_code == 401 + + # API: Get policy status + policy_status = { + "instance_status" : "NOT IN EFFECT", + "has_been_deleted" : "false", + "created_at" : "????" + } + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policy_status, result) + assert res == True + + # Load a policy type: 2 + with open(TESTDATA+'pt2.json') as json_file: + policytype_2 = json.load(json_file) + response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2)) + assert response.status_code == 201 + assert response.data == b"Policy type 2 is OK." + + # Load a policy type: 2, again + with open(TESTDATA+'pt2.json') as json_file: + policytype_2 = json.load(json_file) + response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2)) + assert response.status_code == 200 + assert response.data == b"Policy type 2 is OK." + + # API: Get policy type ids, shall contain '1' and '2' + data_policytypes_get = [ 1,2 ] + response=client.get(SERVER_URL+'a1-p/policytypes') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policytypes_get, result) + assert res == True + + # Get policy type ids, shall contain type 1 and 2 ==" + data_policytypes_get = [ "1","2" ] + response=client.get(SERVER_URL+'policytypes') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policytypes_get, result) + assert res == True + + # API: Get policy type 2 + with open(TESTDATA+'pt2.json') as json_file: + policytype_2 = json.load(json_file) + response=client.get(SERVER_URL+'a1-p/policytypes/2') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policytype_2, result) + assert res == True + + # Delete a policy type + response=client.delete(SERVER_URL+'policytype?id=2') + assert response.status_code == 204 + + # API: Get policy type ids, shall contain '1' + data_policytypes_get = [ 1] + response=client.get(SERVER_URL+'a1-p/policytypes') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policytypes_get, result) + assert res == True + + # Load a policy type: 2 + with open(TESTDATA+'pt2.json') as json_file: + policytype_2 = json.load(json_file) + response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_2)) + assert response.status_code == 201 + assert response.data == b"Policy type 2 is OK." + + # API: Get policy type 2 + with open(TESTDATA+'pt2.json') as json_file: + policytype_2 = json.load(json_file) + response=client.get(SERVER_URL+'a1-p/policytypes/2') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policytype_2, result) + assert res == True + + # API: Get instances for type 2, shall be empty + data_policies_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes/2/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # API: Create policy instance pi1 of type: 2, shall fail + with open(TESTDATA+'pi1.json') as json_file: + policy_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_1)) + assert response.status_code == 400 + + # API: Create policy instance pi2 of type: 2. Missing param, shall fail + with open(TESTDATA+'pi2_missing_param.json') as json_file: + policy_2 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi1', headers=header, data=json.dumps(policy_2)) + assert response.status_code == 400 + + # API: Create policy instance pi2 of type: 2 + with open(TESTDATA+'pi2.json') as json_file: + policy_2 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2)) + assert response.status_code == 202 + + with open(TESTDATA+'pi2.json') as json_file: + policy_2 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/2/policies/pi2', headers=header, data=json.dumps(policy_2)) + assert response.status_code == 202 + + # API: Get instances for type 1, shall contain pi1 + data_policies_get = [ "pi1" ] + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # API: Get instances for type 2, shall contain pi2 + data_policies_get = ["pi2" ] + response=client.get(SERVER_URL+'a1-p/policytypes/2/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # API: Create policy instance pi11 (copy of pi1) of type: 1. Shall fail + with open(TESTDATA+'pi1.json') as json_file: + policy_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1)) + assert response.status_code == 400 + + # Set force response code 409. ===" + response=client.post(SERVER_URL+'forceresponse?code=401') + assert response.status_code == 200 + + # API: Get policy status for pi1, shall fail + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') + assert response.status_code == 401 + + # Set force delay 10 + response=client.post(SERVER_URL+'forcedelay?delay=10') + assert response.status_code == 200 + assert response.data == b"Force delay: 10 sec set for all A1 responses" + + # API: Get policy status for pi1. Shall delay 10 sec + policy_status = { + "instance_status" : "NOT IN EFFECT", + "has_been_deleted" : "false", + "created_at" : "????" + } + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policy_status, result) + assert res == True + + # Reset force delay + response=client.post(SERVER_URL+'forcedelay') + assert response.status_code == 200 + assert response.data == b"Force delay: None sec set for all A1 responses" + + # Set status for pi1 + response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT') + assert response.status_code == 200 + + # API: Get policy status for pi1 + policy_status = { + "instance_status" : "IN EFFECT", + "has_been_deleted" : "false", + "created_at" : "????" + } + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policy_status, result) + assert res == True + + # Set status for pi1 + response=client.put(SERVER_URL+'status?policyid=pi1&status=IN%20EFFECT&deleted=true&created_at=2020-03-30%2012:00:00') + assert response.status_code == 200 + + # API: Get policy status for pi1 + policy_status = { + "instance_status" : "IN EFFECT", + "has_been_deleted" : "true", + "created_at" : "????" + } + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi1/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(policy_status, result) + assert res == True + + # Get counter: intstance + response=client.get(SERVER_URL+'counter/num_instances') + assert response.status_code == 200 + assert response.data == b"2" + + # Get counter: types (shall be 2) + response=client.get(SERVER_URL+'counter/num_types') + assert response.status_code == 200 + assert response.data == b"2" + + # Get counter: interface + response=client.get(SERVER_URL+'counter/interface') + assert response.status_code == 200 + assert response.data == b"OSC_2.1.0" + + # Get counter: remote hosts + response=client.get(SERVER_URL+'counter/remote_hosts') + assert response.status_code == 200 + + # Get counter: test, shall fail + response=client.get(SERVER_URL+'counter/test') + assert response.status_code == 404 + + # API: DELETE policy instance pi1 + response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1') + assert response.status_code == 202 + + # API: Get instances for type 1, shall be empty + data_policies_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # API: Get instances for type 2, shall contain pi2 + data_policies_get = ["pi2" ] + response=client.get(SERVER_URL+'a1-p/policytypes/2/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policies_get, result) + assert res == True + + # Get counter: instances + response=client.get(SERVER_URL+'counter/num_instances') + assert response.status_code == 200 + assert response.data == b"1" + + + ### Tests to increase code coverage + + # Set force response code 500 + response=client.post(SERVER_URL+'forceresponse?code=500') + assert response.status_code == 200 + + # API: Healthcheck + response=client.get(SERVER_URL+'a1-p/healthcheck') + assert response.status_code == 500 + + # Set force response code 501 + response=client.post(SERVER_URL+'forceresponse?code=501') + assert response.status_code == 200 + + # API: Get policy types + data_policytypes_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes') + assert response.status_code == 501 + + # Set force response code 502 + response=client.post(SERVER_URL+'forceresponse?code=502') + assert response.status_code == 200 + + # API: Delete a policy type, shall fail + response=client.delete(SERVER_URL+'a1-p/policytypes/55') + assert response.status_code == 502 + + # Set force response code 503. ===" + response=client.post(SERVER_URL+'forceresponse?code=503') + assert response.status_code == 200 + + with open(TESTDATA+'pi1.json') as json_file: + policy_1 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/1/policies/pi11', headers=header, data=json.dumps(policy_1)) + assert response.status_code == 503 + + # Set force response code 504 + response=client.post(SERVER_URL+'forceresponse?code=504') + assert response.status_code == 200 + + # API: Get instances for type 1, shall fail + data_policies_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies') + assert response.status_code == 504 + + # Set force response code 505. ===" + response=client.post(SERVER_URL+'forceresponse?code=505') + assert response.status_code == 200 + + # API: delete instance pi1, shall fail + response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi1') + assert response.status_code == 505 + + # API: Delete a policy type having instances, shall fail + response=client.delete(SERVER_URL+'a1-p/policytypes/2') + assert response.status_code == 400 + + # API: delete instance pi1 in type 5, shall fail + response=client.delete(SERVER_URL+'a1-p/policytypes/5/policies/pi1') + assert response.status_code == 404 + + # API: delete instance pi99 in type 1, shall fail + response=client.delete(SERVER_URL+'a1-p/policytypes/1/policies/pi99') + assert response.status_code == 404 + + # API: Create policy instance pi80 of type: 5 + with open(TESTDATA+'pi1.json') as json_file: + policy_80 = json.load(json_file) + response=client.put(SERVER_URL+'a1-p/policytypes/5/policies/pi80', headers=header, data=json.dumps(policy_80)) + assert response.status_code == 404 + + # API: Get policy type + data_policytypes_get = [ ] + response=client.get(SERVER_URL+'a1-p/policytypes/55') + assert response.status_code == 404 + + # API: Get status, bad type - shall fail + response=client.get(SERVER_URL+'a1-p/policytypes/99/policies/pi1/status') + assert response.status_code == 404 + + # API: Get status, bad instance - shall fail + response=client.get(SERVER_URL+'a1-p/policytypes/1/policies/pi111/status') + assert response.status_code == 404 + + # Load policy type, no type in url - shall faill + with open(TESTDATA+'pt2.json') as json_file: + policytype_2 = json.load(json_file) + response=client.put(SERVER_URL+'policytype', headers=header, data=json.dumps(policytype_2)) + assert response.status_code == 400 + + # Load policy type - duplicatee - shall faill + with open(TESTDATA+'pt1.json') as json_file: + policytype_1 = json.load(json_file) + response=client.put(SERVER_URL+'policytype?id=2', headers=header, data=json.dumps(policytype_1)) + assert response.status_code == 400 diff --git a/near-rt-ric-simulator/tests/STD_1.1.3/test_std_1_1_3.py b/near-rt-ric-simulator/tests/STD_1.1.3/test_std_1_1_3.py index db0a6a2..fc166c9 100644 --- a/near-rt-ric-simulator/tests/STD_1.1.3/test_std_1_1_3.py +++ b/near-rt-ric-simulator/tests/STD_1.1.3/test_std_1_1_3.py @@ -17,11 +17,9 @@ # This test case test the STD_1.1.3 version of the simulator -from unittest import TestCase +import pytest import sys import os -import threading -import requests import json #Constants for the test case @@ -48,447 +46,440 @@ sys.path.append(os.path.abspath(cwd+'../../src/'+INTERFACE_VERSION)) os.chdir(cwd+"../../src/"+INTERFACE_VERSION) import main +from main import app from compare_json import compare -class TestCase1(TestCase): - from main import app - server = None - - def startServer(self): - self.app.run(port=PORT_NUMBER, host=HOST_IP, threaded=True) - - def setUp(self): - self.server=threading.Thread(target=self.startServer, args=()) - self.server.daemon = True - self.server.start() - - # executed after each test - def tearDown(self): - self.server.killed=True - - #from main import app - def test_apis(self): - - # header for json payload - header = { - "Content-Type" : "application/json" - } - - # Simulator hello world - response=requests.get(SERVER_URL) - self.assertEqual(response.status_code, 200) - - # Check used and implemented interfaces - response=requests.get(SERVER_URL+'container_interfaces') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Current interface: STD_1.1.3 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']") - - # Reset simulator instances - response=requests.post(SERVER_URL+'deleteinstances') - self.assertEqual(response.status_code, 200) - - # Reset simulator, all - response=requests.post(SERVER_URL+'deleteall') - self.assertEqual(response.status_code, 200) - - # API: Get policy instances, shall be empty - data_policy_get = [ ] - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policy_get, result) - self.assertEqual(res, True) - - #API: Create policy instance pi1 - data_pi1 = { - "scope": { - "ueId": "ue1", - "groupId": "group1", - "sliceId": "slice1", - "qosId": "qos1", - "cellId": "cell1" - }, - "statement": { - "priorityLevel": 5 - } - } - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi1', headers=header, data=json.dumps(data_pi1)) - self.assertEqual(response.status_code, 201) - result=json.loads(response.text) - res=compare(data_pi1, result) - self.assertEqual(res, True) - - #API: Update policy instance pi1 - data_pi1_updated = { - "scope": { - "ueId": "ue1", - "groupId": "group1", - "sliceId": "slice1", - "qosId": "qos1", - "cellId": "cell1" - }, - "statement": { - "priorityLevel": 6 - } - } - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi1', headers=header, data=json.dumps(data_pi1_updated)) - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_pi1_updated, result) - self.assertEqual(res, True) - - #API: Create policy instance pi9, bad json - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi9', headers=header, data="test") - self.assertEqual(response.status_code, 400) - - # API: Get policy instances, shall contain pi1 - data_policy_get = [ "pi1" ] - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policy_get, result) - self.assertEqual(res, True) - - # API: Create policy instance pi2 (copy of pi1). Shall fail - data_create_errror_pi1 = { - "title" : "The policy json already exists.", - "status" : 400, - "instance" : "pi2" - } - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi2', headers=header, data=json.dumps(data_pi1_updated)) - self.assertEqual(response.status_code, 400) - result=json.loads(response.text) - res=compare(data_create_errror_pi1, result) - self.assertEqual(res, True) - - # Set force response code 409. ===" - response=requests.post(SERVER_URL+'forceresponse?code=409') - self.assertEqual(response.status_code, 200) - - # API: Get policy instances, shall fail - data_get_errror = { - "title" : "Conflict", - "status" : 409, - "detail" : "Request could not be processed in the current state of the resource" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 409) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Reset force response - response=requests.post(SERVER_URL+'forceresponse') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Force response code: None set for one single A1 response") - - ### - ### Repeating the above two test for code coverage - ### - # Set force response code 400 - response=requests.post(SERVER_URL+'forceresponse?code=400') - self.assertEqual(response.status_code, 200) - - #API: Create policy instance pi3, shall fail - data_pi3 = { - "scope": { - "ueId": "ue3" - }, - "statement": { - "priorityLevel": 5 - } - } - data_get_errror = { - "title" : "Bad request", - "status" : 400, - "detail" : "Object in payload not properly formulated or not related to the method" - } - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi3', headers=header, data=json.dumps(data_pi3)) - self.assertEqual(response.status_code, 400) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set force response code 404 - response=requests.post(SERVER_URL+'forceresponse?code=404') - self.assertEqual(response.status_code, 200) - - # API: Get policy instance pi1, shall fail ==" - data_get_errror = { - "title" : "Not found", - "status" : 404, - "detail" : "No resource found at the URI" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi1') - self.assertEqual(response.status_code, 404) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set force response code 405 - response=requests.post(SERVER_URL+'forceresponse?code=405') - self.assertEqual(response.status_code, 200) - - # API: Delete policy instances pi1, shall fail ==" - data_get_errror = { - "title" : "Method not allowed", - "status" : 405, - "detail" : "Method not allowed for the URI" - } - response=requests.delete(SERVER_URL+'A1-P/v1/policies/pi1') - self.assertEqual(response.status_code, 405) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set force response code 429 - response=requests.post(SERVER_URL+'forceresponse?code=429') - self.assertEqual(response.status_code, 200) - - # API: Get policy status pi3, shall fail ==" - data_get_errror = { - "title" : "Too many requests", - "status" : 429, - "detail" : "Too many requests have been sent in a given amount of time" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi3/status') - self.assertEqual(response.status_code, 429) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set force response code 507 - response=requests.post(SERVER_URL+'forceresponse?code=507') - self.assertEqual(response.status_code, 200) - - # API: Get policy instances, shall fail - data_get_errror = { - "title" : "Insufficient storage", - "status" : 507, - "detail" : "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 507) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set force response code 503. ===" - response=requests.post(SERVER_URL+'forceresponse?code=503') - self.assertEqual(response.status_code, 200) - - # API: Get policy instances, shall fail - data_get_errror = { - "title" : "Service unavailable", - "status" : 503, - "detail" : "The provider is currently unable to handle the request due to a temporary overload" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 503) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set force response code 555. ===" - response=requests.post(SERVER_URL+'forceresponse?code=555') - self.assertEqual(response.status_code, 200) - - # API: Get policy instances, shall fail - data_get_errror = { - "title" : "Unknown", - "status" : "555", - "detail" : "Not implemented response code" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 555) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - ### - ### End of repeated test - ### - - - # API: Get policy status - data_policy_status = { - "enforceStatus" : "UNDEFINED" - } - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi1/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policy_status, result) - self.assertEqual(res, True) - - # API: Create policy instance pi2 - data_pi2 = { - "scope": { - "ueId": "ue2", - "groupId": "group2", - "sliceId": "slice2", - "qosId": "qos2", - "cellId": "cell2" - }, - "statement": { - "priorityLevel": 10 - } +@pytest.fixture +def client(): + with app.app.test_client() as c: + yield c + +def test_apis(client): + + # header for json payload + header = { + "Content-Type" : "application/json" + } + + # Simulator hello world + response=client.get(SERVER_URL) + assert response.status_code == 200 + + # Check used and implemented interfaces + response=client.get(SERVER_URL+'container_interfaces') + assert response.status_code == 200 + assert response.data == b"Current interface: STD_1.1.3 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']" + + # Reset simulator instances + response=client.post(SERVER_URL+'deleteinstances') + assert response.status_code == 200 + + # Reset simulator, all + response=client.post(SERVER_URL+'deleteall') + assert response.status_code == 200 + + # API: Get policy instances, shall be empty + data_policy_get = [ ] + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policy_get, result) + assert res == True + + #API: Create policy instance pi1 + data_pi1 = { + "scope": { + "ueId": "ue1", + "groupId": "group1", + "sliceId": "slice1", + "qosId": "qos1", + "cellId": "cell1" + }, + "statement": { + "priorityLevel": 5 } - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi2', headers=header, data=json.dumps(data_pi2)) - self.assertEqual(response.status_code, 201) - result=json.loads(response.text) - res=compare(data_pi2, result) - self.assertEqual(res, True) - - - # API: Update policy instance pi2 - # Reuse same policy data - response=requests.put(SERVER_URL+'A1-P/v1/policies/pi2?notificationDestination=http://'+HOST_IP+':'+PORT_NUMBER+'/statustest', headers=header, data=json.dumps(data_pi2)) - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_pi2, result) - self.assertEqual(res, True) - - # API: Get policy instances, shall contain pi1 and pi2 - data_policy_get = [ "pi1", "pi2" ] - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policy_get, result) - self.assertEqual(res, True) - - # Set force delay 10 - response=requests.post(SERVER_URL+'forcedelay?delay=10') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Force delay: 10 sec set for all A1 responses") - - # API: Get policy instances, shall contain pi1 and pi2 and delayed 10 sec - data_policy_get = [ "pi1", "pi2" ] - response=requests.get(SERVER_URL+'A1-P/v1/policies') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_policy_get, result) - self.assertEqual(res, True) - - # Reset force delay - response=requests.post(SERVER_URL+'forcedelay') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "Force delay: None sec set for all A1 responses") - - # API: GET policy instance pi1 - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi1') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_pi1_updated, result) - self.assertEqual(res, True) - - # API: GET policy instance pi2 - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi2') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_pi2, result) - self.assertEqual(res, True) - - # API: GET policy instance pi9, shall fail - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi9') - self.assertEqual(response.status_code, 404) - - # API: DELETE policy instance pi1 - response=requests.delete(SERVER_URL+'A1-P/v1/policies/pi1') - self.assertEqual(response.status_code, 204) - - # API: DELETE policy instance pi9, shall fail - response=requests.delete(SERVER_URL+'A1-P/v1/policies/pi9') - self.assertEqual(response.status_code, 404) - - # API: Get policy status for pi1, shall fail - data_get_errror = { - "title" : "The policy identity does not exist.", - "status" : 404, - "detail" : "There is no existing policy instance with the identity: pi1", - "instance" : "pi1" + } + response=client.put(SERVER_URL+'A1-P/v1/policies/pi1', headers=header, data=json.dumps(data_pi1)) + assert response.status_code == 201 + result=json.loads(response.data) + res=compare(data_pi1, result) + assert res == True + + #API: Update policy instance pi1 + data_pi1_updated = { + "scope": { + "ueId": "ue1", + "groupId": "group1", + "sliceId": "slice1", + "qosId": "qos1", + "cellId": "cell1" + }, + "statement": { + "priorityLevel": 6 } - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi1/status') - self.assertEqual(response.status_code, 404) - result=json.loads(response.text) - res=compare(data_get_errror, result) - self.assertEqual(res, True) - - # Set status for policy instance pi2 - response=requests.put(SERVER_URL+'status?policyid=pi2&status=OK') - self.assertEqual(response.status_code, 200) - - # API: Get policy status for pi2 - data_get_status = { - "enforceStatus" : "OK" + } + response=client.put(SERVER_URL+'A1-P/v1/policies/pi1', headers=header, data=json.dumps(data_pi1_updated)) + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_pi1_updated, result) + assert res == True + + #API: Create policy instance pi9, bad json + response=client.put(SERVER_URL+'A1-P/v1/policies/pi9', headers=header, data="test") + assert response.status_code == 400 + + # API: Get policy instances, shall contain pi1 + data_policy_get = [ "pi1" ] + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policy_get, result) + assert res == True + + # API: Create policy instance pi2 (copy of pi1). Shall fail + data_create_errror_pi1 = { + "title" : "The policy json already exists.", + "status" : 400, + "instance" : "pi2" + } + response=client.put(SERVER_URL+'A1-P/v1/policies/pi2', headers=header, data=json.dumps(data_pi1_updated)) + assert response.status_code == 400 + result=json.loads(response.data) + res=compare(data_create_errror_pi1, result) + assert res == True + + # Set force response code 409. ===" + response=client.post(SERVER_URL+'forceresponse?code=409') + assert response.status_code == 200 + + # API: Get policy instances, shall fail + data_get_errror = { + "title" : "Conflict", + "status" : 409, + "detail" : "Request could not be processed in the current state of the resource" + } + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 409 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Reset force response + response=client.post(SERVER_URL+'forceresponse') + assert response.status_code == 200 + assert response.data == b"Force response code: None set for one single A1 response" + + ### + ### Repeating the above two test for code coverage + ### + # Set force response code 400 + response=client.post(SERVER_URL+'forceresponse?code=400') + assert response.status_code == 200 + + #API: Create policy instance pi3, shall fail + data_pi3 = { + "scope": { + "ueId": "ue3" + }, + "statement": { + "priorityLevel": 5 } - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi2/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_get_status, result) - self.assertEqual(res, True) - - # Set status for policy instance pi2 - response=requests.put(SERVER_URL+'status?policyid=pi2&status=NOTOK&reason=notok_reason') - self.assertEqual(response.status_code, 200) - - # API: Get policy status for pi2 - data_get_status = { - "enforceStatus" : "NOTOK", - "enforceReason" : "notok_reason" + } + data_get_errror = { + "title" : "Bad request", + "status" : 400, + "detail" : "Object in payload not properly formulated or not related to the method" + } + response=client.put(SERVER_URL+'A1-P/v1/policies/pi3', headers=header, data=json.dumps(data_pi3)) + assert response.status_code == 400 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set force response code 404 + response=client.post(SERVER_URL+'forceresponse?code=404') + assert response.status_code == 200 + + # API: Get policy instance pi1, shall fail ==" + data_get_errror = { + "title" : "Not found", + "status" : 404, + "detail" : "No resource found at the URI" + } + response=client.get(SERVER_URL+'A1-P/v1/policies/pi1') + assert response.status_code == 404 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set force response code 405 + response=client.post(SERVER_URL+'forceresponse?code=405') + assert response.status_code == 200 + + # API: Delete policy instances pi1, shall fail ==" + data_get_errror = { + "title" : "Method not allowed", + "status" : 405, + "detail" : "Method not allowed for the URI" + } + response=client.delete(SERVER_URL+'A1-P/v1/policies/pi1') + assert response.status_code == 405 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set force response code 429 + response=client.post(SERVER_URL+'forceresponse?code=429') + assert response.status_code == 200 + + # API: Get policy status pi3, shall fail ==" + data_get_errror = { + "title" : "Too many requests", + "status" : 429, + "detail" : "Too many requests have been sent in a given amount of time" + } + response=client.get(SERVER_URL+'A1-P/v1/policies/pi3/status') + assert response.status_code == 429 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set force response code 507 + response=client.post(SERVER_URL+'forceresponse?code=507') + assert response.status_code == 200 + + # API: Get policy instances, shall fail + data_get_errror = { + "title" : "Insufficient storage", + "status" : 507, + "detail" : "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request" + } + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 507 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set force response code 503. ===" + response=client.post(SERVER_URL+'forceresponse?code=503') + assert response.status_code == 200 + + # API: Get policy instances, shall fail + data_get_errror = { + "title" : "Service unavailable", + "status" : 503, + "detail" : "The provider is currently unable to handle the request due to a temporary overload" + } + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 503 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set force response code 555. ===" + response=client.post(SERVER_URL+'forceresponse?code=555') + assert response.status_code == 200 + + # API: Get policy instances, shall fail + data_get_errror = { + "title" : "Unknown", + "status" : "555", + "detail" : "Not implemented response code" + } + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 555 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + ### + ### End of repeated test + ### + + + # API: Get policy status + data_policy_status = { + "enforceStatus" : "UNDEFINED" + } + response=client.get(SERVER_URL+'A1-P/v1/policies/pi1/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policy_status, result) + assert res == True + + # API: Create policy instance pi2 + data_pi2 = { + "scope": { + "ueId": "ue2", + "groupId": "group2", + "sliceId": "slice2", + "qosId": "qos2", + "cellId": "cell2" + }, + "statement": { + "priorityLevel": 10 } - response=requests.get(SERVER_URL+'A1-P/v1/policies/pi2/status') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_get_status, result) - self.assertEqual(res, True) - - # Send status for pi2 - response=requests.post(SERVER_URL+'sendstatus?policyid=pi2') - self.assertEqual(response.status_code, 200) - result=json.loads(response.text) - res=compare(data_get_status, result) - self.assertEqual(res, True) - - # Send status, shall fail - response=requests.post(SERVER_URL+'sendstatus') - self.assertEqual(response.status_code, 400) - - # Send status pi9, shall fail - response=requests.post(SERVER_URL+'sendstatus?policyid=pi9') - self.assertEqual(response.status_code, 404) - - # Set status for policy instance , shall fail - response=requests.put(SERVER_URL+'status') - self.assertEqual(response.status_code, 400) - - # Set status for policy instance pi9, shall fail - response=requests.put(SERVER_URL+'status?policyid=pi9') - self.assertEqual(response.status_code, 404) - - # Set status for policy instance pi2, shall fail - response=requests.put(SERVER_URL+'status?policyid=pi2') - self.assertEqual(response.status_code, 400) - - - # Get counter: intstance - response=requests.get(SERVER_URL+'counter/num_instances') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "1") - - # Get counter: types (shall be 0) - response=requests.get(SERVER_URL+'counter/num_types') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "0") - - # Get counter: interface - response=requests.get(SERVER_URL+'counter/interface') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.text, "STD_1.1.3") - - # Get counter: remote hosts - response=requests.get(SERVER_URL+'counter/remote_hosts') - self.assertEqual(response.status_code, 200) - - # Get counter: test, shall fail - response=requests.get(SERVER_URL+'counter/test') - self.assertEqual(response.status_code, 404) + } + response=client.put(SERVER_URL+'A1-P/v1/policies/pi2', headers=header, data=json.dumps(data_pi2)) + assert response.status_code == 201 + result=json.loads(response.data) + res=compare(data_pi2, result) + assert res == True + + + # API: Update policy instance pi2 + # Reuse same policy data + response=client.put(SERVER_URL+'A1-P/v1/policies/pi2?notificationDestination=http://'+HOST_IP+':'+PORT_NUMBER+'/statustest', headers=header, data=json.dumps(data_pi2)) + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_pi2, result) + assert res == True + + # API: Get policy instances, shall contain pi1 and pi2 + data_policy_get = [ "pi1", "pi2" ] + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policy_get, result) + assert res == True + + # Set force delay 10 + response=client.post(SERVER_URL+'forcedelay?delay=10') + assert response.status_code == 200 + assert response.data == b"Force delay: 10 sec set for all A1 responses" + + # API: Get policy instances, shall contain pi1 and pi2 and delayed 10 sec + data_policy_get = [ "pi1", "pi2" ] + response=client.get(SERVER_URL+'A1-P/v1/policies') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_policy_get, result) + assert res == True + + # Reset force delay + response=client.post(SERVER_URL+'forcedelay') + assert response.status_code == 200 + assert response.data == b"Force delay: None sec set for all A1 responses" + + # API: GET policy instance pi1 + response=client.get(SERVER_URL+'A1-P/v1/policies/pi1') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_pi1_updated, result) + assert res == True + + # API: GET policy instance pi2 + response=client.get(SERVER_URL+'A1-P/v1/policies/pi2') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_pi2, result) + assert res == True + + # API: GET policy instance pi9, shall fail + response=client.get(SERVER_URL+'A1-P/v1/policies/pi9') + assert response.status_code == 404 + + # API: DELETE policy instance pi1 + response=client.delete(SERVER_URL+'A1-P/v1/policies/pi1') + assert response.status_code == 204 + + # API: DELETE policy instance pi9, shall fail + response=client.delete(SERVER_URL+'A1-P/v1/policies/pi9') + assert response.status_code == 404 + + # API: Get policy status for pi1, shall fail + data_get_errror = { + "title" : "The policy identity does not exist.", + "status" : 404, + "detail" : "There is no existing policy instance with the identity: pi1", + "instance" : "pi1" + } + response=client.get(SERVER_URL+'A1-P/v1/policies/pi1/status') + assert response.status_code == 404 + result=json.loads(response.data) + res=compare(data_get_errror, result) + assert res == True + + # Set status for policy instance pi2 + response=client.put(SERVER_URL+'status?policyid=pi2&status=OK') + assert response.status_code == 200 + + # API: Get policy status for pi2 + data_get_status = { + "enforceStatus" : "OK" + } + response=client.get(SERVER_URL+'A1-P/v1/policies/pi2/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_get_status, result) + assert res == True + + # Set status for policy instance pi2 + response=client.put(SERVER_URL+'status?policyid=pi2&status=NOTOK&reason=notok_reason') + assert response.status_code == 200 + + # API: Get policy status for pi2 + data_get_status = { + "enforceStatus" : "NOTOK", + "enforceReason" : "notok_reason" + } + response=client.get(SERVER_URL+'A1-P/v1/policies/pi2/status') + assert response.status_code == 200 + result=json.loads(response.data) + res=compare(data_get_status, result) + assert res == True + + #Found no way to test these functions + #'sendstatus' will send a http request that will fail + #since no server will receive the call + #These function is instead tested when running the bash script in the 'test' dir + # # Send status for pi2 + # response=client.post(SERVER_URL+'sendstatus?policyid=pi2') + # assert response.status_code == 200 + # result=json.loads(response.data) + # res=compare(data_get_status, result) + # assert res == True + + # # Send status, shall fail + # response=client.post(SERVER_URL+'sendstatus') + # assert response.status_code == 400 + + # # Send status pi9, shall fail + # response=client.post(SERVER_URL+'sendstatus?policyid=pi9') + # assert response.status_code == 404 + + # Set status for policy instance , shall fail + response=client.put(SERVER_URL+'status') + assert response.status_code == 400 + + # Set status for policy instance pi9, shall fail + response=client.put(SERVER_URL+'status?policyid=pi9') + assert response.status_code == 404 + + # Set status for policy instance pi2, shall fail + response=client.put(SERVER_URL+'status?policyid=pi2') + assert response.status_code == 400 + + + # Get counter: intstance + response=client.get(SERVER_URL+'counter/num_instances') + assert response.status_code == 200 + assert response.data == b"1" + + # Get counter: types (shall be 0) + response=client.get(SERVER_URL+'counter/num_types') + assert response.status_code == 200 + assert response.data == b"0" + + # Get counter: interface + response=client.get(SERVER_URL+'counter/interface') + assert response.status_code == 200 + assert response.data == b"STD_1.1.3" + + # Get counter: remote hosts + response=client.get(SERVER_URL+'counter/remote_hosts') + assert response.status_code == 200 + + # Get counter: test, shall fail + response=client.get(SERVER_URL+'counter/test') + assert response.status_code == 404 diff --git a/tox.ini b/tox.ini index 9936af4..ba5b8d2 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,6 @@ deps= pytest coverage pytest-cov - requests connexion setenv = -- 2.16.6