+ resource_pool_id1 = str(uuid.uuid4())
+ resource_pool1 = ocloud.ResourcePool(
+ resource_pool_id1, "resourcepool1", config.get_api_url(),
+ ocloud1.oCloudId)
+ assert resource_pool_id1 is not None and \
+ resource_pool1.resourcePoolId == resource_pool_id1
+
+
+def test_new_resource():
+ resource_id1 = str(uuid.uuid4())
+ resource_type_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ resource1 = ocloud.Resource(
+ resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
+ assert resource_id1 is not None and resource1.resourceId == resource_id1
+
+
+def test_new_deployment_manager():
+ ocloud_id1 = str(uuid.uuid4())
+ deployment_manager_id1 = str(uuid.uuid4())
+ deployment_manager1 = ocloud.DeploymentManager(
+ deployment_manager_id1, "k8s1", ocloud_id1,
+ config.get_api_url()+"/k8s1")
+ assert deployment_manager_id1 is not None and deployment_manager1.\
+ deploymentManagerId == deployment_manager_id1
+
+
+def test_new_subscription():
+ subscription_id1 = str(uuid.uuid4())
+ subscription1 = subscription_obj.Subscription(
+ subscription_id1, "https://callback/uri/write/here")
+ assert subscription_id1 is not None and\
+ subscription1.subscriptionId == subscription_id1
+
+
+def test_view_olcouds(mock_uow):
+ session, uow = mock_uow
+
+ ocloud1_UUID = str(uuid.uuid4)
+ ocloud1 = MagicMock()
+ ocloud1.serialize.return_value = {
+ 'oCloudId': ocloud1_UUID, 'name': 'ocloud1'}
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = [ocloud1]
+
+ ocloud_list = ocloud_view.oclouds(uow)
+ # assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
+ assert len(ocloud_list) == 1
+
+
+def test_view_olcoud_one(mock_uow):
+ session, uow = mock_uow
+
+ ocloud1_UUID = str(uuid.uuid4)
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+ assert ocloud_res is None
+
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "oCloudId": ocloud1_UUID}
+
+ ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+ assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
+
+
+def test_view_resource_types(mock_uow):
+ session, uow = mock_uow
+
+ resource_type_id1 = str(uuid.uuid4())
+ restype1 = MagicMock()
+ restype1.serialize.return_value = {
+ "resourceTypeId": resource_type_id1}
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [restype1]
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.resource_types(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_type_one(mock_uow):
+ session, uow = mock_uow
+
+ resource_type_id1 = str(uuid.uuid4())
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ resource_type_res = ocloud_view.resource_type_one(
+ resource_type_id1, uow)
+ assert resource_type_res is None
+
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "resourceTypeId": resource_type_id1}
+
+ resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
+ assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_pools(mock_uow):
+ session, uow = mock_uow
+
+ resource_pool_id1 = str(uuid.uuid4())
+ respool1 = MagicMock()
+ respool1.serialize.return_value = {
+ "resourcePoolId": resource_pool_id1}
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [respool1]
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.resource_pools(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_pool_one(mock_uow):
+ session, uow = mock_uow
+
+ resource_pool_id1 = str(uuid.uuid4())
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ resource_pool_res = ocloud_view.resource_pool_one(
+ resource_pool_id1, uow)
+ assert resource_pool_res is None
+
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "resourcePoolId": resource_pool_id1
+ }
+
+ resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
+ assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resources(mock_uow):
+ session, uow = mock_uow
+
+ resource_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ res1 = MagicMock()
+ res1.serialize.return_value = {
+ "resourceId": resource_id1,
+ "resourcePoolId": resource_pool_id1
+ }
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [res1]
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+ # TODO: workaround for sqlalchemy not mapping with resource object
+ setattr(ocloud.Resource, 'resourcePoolId', '')
+
+ result = ocloud_view.resources(resource_pool_id1, uow)
+ assert result['count'] == 1
+ resource_list = result['results']
+ assert str(resource_list[0].get("resourceId")) == resource_id1
+ assert str(resource_list[0].get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_one(mock_uow):
+ session, uow = mock_uow
+
+ resource_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ resource_res = ocloud_view.resource_one(
+ resource_id1, uow, resource_pool_id1)
+ assert resource_res is None
+
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "resourceId": resource_id1,
+ "resourcePoolId": resource_pool_id1
+ }
+ resource_res = ocloud_view.resource_one(
+ resource_id1, uow, resource_pool_id1)
+ assert str(resource_res.get("resourceId") == resource_id1)
+ assert str(resource_res.get("resourcePoolId") == resource_pool_id1)
+
+
+def test_view_deployment_managers(mock_uow):
+ session, uow = mock_uow
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ dm1 = MagicMock()
+ dm1.serialize.return_value = {
+ "deploymentManagerId": deployment_manager_id1,
+ }
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [dm1]
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.deployment_managers(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("deploymentManagerId")
+ ) == deployment_manager_id1
+
+
+def test_view_deployment_manager_one(mock_uow):
+ session, uow = mock_uow
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, uow)
+ assert deployment_manager_res is None
+
+ dms_endpoint = "http://o2:30205/o2dms/v1/uuid"
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "deploymentManagerId": deployment_manager_id1,
+ "serviceUri": dms_endpoint,
+ "profile": {}
+ }
+
+ CONF.API.DMS_SUPPORT_PROFILES = 'native_k8sapi,sol018,sol018_helmcli'
+ cluster_endpoint = "https://test_k8s:6443"
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value['profile'] = {
+ "cluster_api_endpoint": cluster_endpoint
+ }
+
+ # profile default
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, uow)
+ assert str(deployment_manager_res.get(
+ "deploymentManagerId")) == deployment_manager_id1
+ assert str(deployment_manager_res.get(
+ 'serviceUri')) == cluster_endpoint
+ assert deployment_manager_res.get('profile') is None
+
+ # profile sol018
+ profileName = ocloud.DeploymentManagerProfileSOL018
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value['profile'] = {
+ "cluster_api_endpoint": cluster_endpoint
+ }
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, uow, profile=profileName)
+ assert str(deployment_manager_res.get(
+ 'serviceUri')) == cluster_endpoint
+ assert str(deployment_manager_res.get("extensions").get(
+ "profileName")) == profileName
+
+ # profile wrong name
+ profileName = 'wrong_profile'
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, uow, profile=profileName)
+ assert deployment_manager_res == ""
+
+
+def test_view_subscriptions(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ sub1 = MagicMock()
+ sub1.serialize.return_value = {
+ "subscriptionId": subscription_id1,
+ }
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [sub1]
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.subscriptions(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("subscriptionId")) == subscription_id1
+
+
+def test_view_subscription_one(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ subscription_res = ocloud_view.subscription_one(
+ subscription_id1, uow)
+ assert subscription_res is None
+
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "subscriptionId": subscription_id1,
+ }
+
+ subscription_res = ocloud_view.subscription_one(
+ subscription_id1, uow)
+ assert str(subscription_res.get(
+ "subscriptionId")) == subscription_id1
+
+
+def test_flask_get_list(mock_flask_uow):
+ session, app = mock_flask_uow
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+ apibase = config.get_o2ims_api_base() + '/v1'
+ # TODO: workaround for sqlalchemy not mapping with resource object
+ setattr(ocloud.Resource, 'resourcePoolId', '')
+
+ with app.test_client() as client:
+ # Get list and return empty list
+ ##########################
+ resp = client.get(apibase+"/resourceTypes")
+ assert resp.get_data() == b'[]\n'
+
+ resp = client.get(apibase+"/resourcePools")
+ assert resp.get_data() == b'[]\n'
+
+ resource_pool_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourcePools/" +
+ resource_pool_id1+"/resources")
+ assert resp.get_data() == b'[]\n'
+
+ resp = client.get(apibase+"/deploymentManagers")
+ assert resp.get_data() == b'[]\n'
+
+ resp = client.get(apibase+"/subscriptions")
+ assert resp.get_data() == b'[]\n'
+
+
+def test_flask_get_one(mock_flask_uow):
+ session, app = mock_flask_uow
+
+ session.return_value.query.return_value.filter_by.return_value.\
+ first.return_value = None
+ apibase = config.get_o2ims_api_base() + '/v1'
+
+ with app.test_client() as client:
+ # Get one and return 404
+ ###########################
+ resp = client.get(apibase+"/")
+ assert resp.status_code == 404
+
+ resource_type_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourceTypes/"+resource_type_id1)
+ assert resp.status_code == 404
+
+ resource_pool_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourcePools/"+resource_pool_id1)
+ assert resp.status_code == 404
+
+ resource_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourcePools/" +
+ resource_pool_id1+"/resources/"+resource_id1)
+ assert resp.status_code == 404
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/deploymentManagers/" +
+ deployment_manager_id1)
+ assert resp.status_code == 404
+
+ subscription_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/subscriptions/"+subscription_id1)
+ assert resp.status_code == 404
+
+
+def test_flask_post(mock_flask_uow, mappers):
+ session, app = mock_flask_uow
+ apibase = config.get_o2ims_api_base() + '/v1'
+
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
+ with app.test_client() as client:
+ session.return_value.execute.return_value = []
+
+ sub_callback = 'http://subscription/callback/url'
+ resp = client.post(apibase+'/subscriptions', json={
+ 'callback': sub_callback,
+ 'consumerSubscriptionId': 'consumerSubId1',
+ 'filter': '(eq,resourceTypeId,xxx)'
+ })
+ assert resp.status_code == 201
+ assert 'subscriptionId' in resp.get_json()
+
+
+def test_flask_delete(mock_flask_uow):
+ session, app = mock_flask_uow
+ apibase = config.get_o2ims_api_base() + '/v1'
+
+ with app.test_client() as client:
+ session.return_value.execute.return_value.first.return_value = {}
+
+ subscription_id1 = str(uuid.uuid4())
+ resp = client.delete(apibase+"/subscriptions/"+subscription_id1)
+ assert resp.status_code == 200
+
+
+def test_flask_not_allowed(mock_flask_uow):
+ _, app = mock_flask_uow
+ apibase = config.get_o2ims_api_base() + '/v1'
+
+ with app.test_client() as client:
+ # Testing resource type not support method
+ ##########################
+ uri = apibase + "/resourceTypes"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ resource_type_id1 = str(uuid.uuid4())
+ uri = apibase + "/resourceTypes/" + resource_type_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing resource pool not support method
+ ##########################
+ uri = apibase + "/resourcePools"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ resource_pool_id1 = str(uuid.uuid4())
+ uri = apibase + "/resourcePools/" + resource_pool_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing resource not support method
+ ##########################
+ uri = apibase + "/resourcePools/" + resource_pool_id1 + "/resources"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ resource_id1 = str(uuid.uuid4())
+ uri = apibase + "/resourcePools/" + \
+ resource_pool_id1 + "/resources/" + resource_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing deployment managers not support method
+ ##########################
+ uri = apibase + "/deploymentManagers"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ uri = apibase + "/deploymentManagers/" + deployment_manager_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing subscriptions not support method
+ ##########################
+ uri = apibase + "/subscriptions"
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ subscription_id1 = str(uuid.uuid4())
+ uri = apibase + "/subscriptions/" + subscription_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'