-class TestCase1(TestCase):
- from main import app
- server = None
-
- def startServer(self):
- self.app.run(port=PORT_NUMBER, host=HOST_IP, threaded=True)
-
- def setUp(self):
- self.server=threading.Thread(target=self.startServer, args=())
- self.server.daemon = True
- self.server.start()
-
- # executed after each test
- def tearDown(self):
- self.server.killed=True
-
- #from main import app
- def test_apis(self):
-
- # header for json payload
- header = {
- "Content-Type" : "application/json"
- }
-
- # Simulator hello world
- response=requests.get(SERVER_URL)
- self.assertEqual(response.status_code, 200)
-
- # Check used and implemented interfaces
- response=requests.get(SERVER_URL+'container_interfaces')
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.text, "Current interface: STD_1.1.3 All supported A1 interface yamls in this container: ['OSC_2.1.0', 'STD_1.1.3']")
-
- # Reset simulator instances
- response=requests.post(SERVER_URL+'deleteinstances')
- self.assertEqual(response.status_code, 200)
-
- # Reset simulator, all
- response=requests.post(SERVER_URL+'deleteall')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy instances, shall be empty
- data_policy_get = [ ]
- response=requests.get(SERVER_URL+'A1-P/v1/policies')
- self.assertEqual(response.status_code, 200)
- result=json.loads(response.text)
- res=compare(data_policy_get, result)
- self.assertEqual(res, True)
-
- #API: Create policy instance pi1
- data_pi1 = {
- "scope": {
- "ueId": "ue1",
- "groupId": "group1",
- "sliceId": "slice1",
- "qosId": "qos1",
- "cellId": "cell1"
- },
- "statement": {
- "priorityLevel": 5
- }
- }
- response=requests.put(SERVER_URL+'A1-P/v1/policies/pi1', headers=header, data=json.dumps(data_pi1))
- self.assertEqual(response.status_code, 201)
- result=json.loads(response.text)
- res=compare(data_pi1, result)
- self.assertEqual(res, True)
-
- #API: Update policy instance pi1
- data_pi1_updated = {
- "scope": {
- "ueId": "ue1",
- "groupId": "group1",
- "sliceId": "slice1",
- "qosId": "qos1",
- "cellId": "cell1"
- },
- "statement": {
- "priorityLevel": 6
- }
- }
- response=requests.put(SERVER_URL+'A1-P/v1/policies/pi1', headers=header, data=json.dumps(data_pi1_updated))
- self.assertEqual(response.status_code, 200)
- result=json.loads(response.text)
- res=compare(data_pi1_updated, result)
- self.assertEqual(res, True)
-
- #API: Create policy instance pi9, bad json
- response=requests.put(SERVER_URL+'A1-P/v1/policies/pi9', headers=header, data="test")
- self.assertEqual(response.status_code, 400)
-
- # API: Get policy instances, shall contain pi1
- data_policy_get = [ "pi1" ]
- response=requests.get(SERVER_URL+'A1-P/v1/policies')
- self.assertEqual(response.status_code, 200)
- result=json.loads(response.text)
- res=compare(data_policy_get, result)
- self.assertEqual(res, True)
-
- # API: Create policy instance pi2 (copy of pi1). Shall fail
- data_create_errror_pi1 = {
- "title" : "The policy json already exists.",
- "status" : 400,
- "instance" : "pi2"
- }
- response=requests.put(SERVER_URL+'A1-P/v1/policies/pi2', headers=header, data=json.dumps(data_pi1_updated))
- self.assertEqual(response.status_code, 400)
- result=json.loads(response.text)
- res=compare(data_create_errror_pi1, result)
- self.assertEqual(res, True)
-
- # Set force response code 409. ==="
- response=requests.post(SERVER_URL+'forceresponse?code=409')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy instances, shall fail
- data_get_errror = {
- "title" : "Conflict",
- "status" : 409,
- "detail" : "Request could not be processed in the current state of the resource"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies')
- self.assertEqual(response.status_code, 409)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Reset force response
- response=requests.post(SERVER_URL+'forceresponse')
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.text, "Force response code: None set for one single A1 response")
-
- ###
- ### Repeating the above two test for code coverage
- ###
- # Set force response code 400
- response=requests.post(SERVER_URL+'forceresponse?code=400')
- self.assertEqual(response.status_code, 200)
-
- #API: Create policy instance pi3, shall fail
- data_pi3 = {
- "scope": {
- "ueId": "ue3"
- },
- "statement": {
- "priorityLevel": 5
- }
- }
- data_get_errror = {
- "title" : "Bad request",
- "status" : 400,
- "detail" : "Object in payload not properly formulated or not related to the method"
- }
- response=requests.put(SERVER_URL+'A1-P/v1/policies/pi3', headers=header, data=json.dumps(data_pi3))
- self.assertEqual(response.status_code, 400)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Set force response code 404
- response=requests.post(SERVER_URL+'forceresponse?code=404')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy instance pi1, shall fail =="
- data_get_errror = {
- "title" : "Not found",
- "status" : 404,
- "detail" : "No resource found at the URI"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies/pi1')
- self.assertEqual(response.status_code, 404)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Set force response code 405
- response=requests.post(SERVER_URL+'forceresponse?code=405')
- self.assertEqual(response.status_code, 200)
-
- # API: Delete policy instances pi1, shall fail =="
- data_get_errror = {
- "title" : "Method not allowed",
- "status" : 405,
- "detail" : "Method not allowed for the URI"
- }
- response=requests.delete(SERVER_URL+'A1-P/v1/policies/pi1')
- self.assertEqual(response.status_code, 405)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Set force response code 429
- response=requests.post(SERVER_URL+'forceresponse?code=429')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy status pi3, shall fail =="
- data_get_errror = {
- "title" : "Too many requests",
- "status" : 429,
- "detail" : "Too many requests have been sent in a given amount of time"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies/pi3/status')
- self.assertEqual(response.status_code, 429)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Set force response code 507
- response=requests.post(SERVER_URL+'forceresponse?code=507')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy instances, shall fail
- data_get_errror = {
- "title" : "Insufficient storage",
- "status" : 507,
- "detail" : "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies')
- self.assertEqual(response.status_code, 507)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Set force response code 503. ==="
- response=requests.post(SERVER_URL+'forceresponse?code=503')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy instances, shall fail
- data_get_errror = {
- "title" : "Service unavailable",
- "status" : 503,
- "detail" : "The provider is currently unable to handle the request due to a temporary overload"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies')
- self.assertEqual(response.status_code, 503)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- # Set force response code 555. ==="
- response=requests.post(SERVER_URL+'forceresponse?code=555')
- self.assertEqual(response.status_code, 200)
-
- # API: Get policy instances, shall fail
- data_get_errror = {
- "title" : "Unknown",
- "status" : "555",
- "detail" : "Not implemented response code"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies')
- self.assertEqual(response.status_code, 555)
- result=json.loads(response.text)
- res=compare(data_get_errror, result)
- self.assertEqual(res, True)
-
- ###
- ### End of repeated test
- ###
-
-
- # API: Get policy status
- data_policy_status = {
- "enforceStatus" : "UNDEFINED"
- }
- response=requests.get(SERVER_URL+'A1-P/v1/policies/pi1/status')
- self.assertEqual(response.status_code, 200)
- result=json.loads(response.text)
- res=compare(data_policy_status, result)
- self.assertEqual(res, True)
-
- # API: Create policy instance pi2
- data_pi2 = {
- "scope": {
- "ueId": "ue2",
- "groupId": "group2",
- "sliceId": "slice2",
- "qosId": "qos2",
- "cellId": "cell2"
- },
- "statement": {
- "priorityLevel": 10
- }