Merge "Fix INF-351 ocloud serviceUri is wrong about protocol"
[pti/o2.git] / tests / unit / test_ocloud.py
index baff737..5b51221 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import pytest
-from o2ims.domain import ocloud
-from o2ims import config
 import uuid
+from unittest.mock import MagicMock
+
+from o2ims.domain import ocloud, subscription_obj
+from o2ims.domain import resource_type as rt
+from o2ims.views import ocloud_view
+from o2common.config import config, conf as CONF
 
 
 def setup_ocloud():
     ocloudid1 = str(uuid.uuid4())
-    ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud for unit test", 1)
+    ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1",
+                            config.get_api_url(), "ocloud for unit test", 1)
     return ocloud1
 
+
 def test_new_ocloud():
     ocloudid1 = str(uuid.uuid4())
-    ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud for unit test", 1)
+    ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1",
+                            config.get_api_url(), "ocloud for unit test", 1)
     assert ocloudid1 is not None and ocloud1.oCloudId == ocloudid1
 
-def test_add_ocloud_with_dms():
+
+# def test_add_ocloud_with_dms():
+#     ocloud1 = setup_ocloud()
+#     dmsid = str(uuid.uuid4())
+#     dms = ocloud.DeploymentManager(
+#         dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
+#     ocloud1.addDeploymentManager(dms)
+#     ocloud1.addDeploymentManager(dms)
+#     assert len(ocloud1.deploymentManagers) == 1
+#     # repo.update(ocloud1.oCloudId, {
+#     #             "deploymentManagers": ocloud1.deploymentManagers})
+
+
+def test_new_resource_type():
+    ocloud1 = setup_ocloud()
+    resource_type_id1 = str(uuid.uuid4())
+    resource_type1 = ocloud.ResourceType(
+        resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
+        ocloud1.oCloudId)
+    assert resource_type_id1 is not None and \
+        resource_type1.resourceTypeId == resource_type_id1
+
+
+def test_new_resource_pool():
     ocloud1 = setup_ocloud()
-    dmsid = str(uuid.uuid4())
-    dms = ocloud.DeploymentManager(
-        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
-    ocloud1.addDeploymentManager(dms)
-    ocloud1.addDeploymentManager(dms)
-    assert len(ocloud1.deploymentManagers) == 1
-    # repo.update(ocloud1.oCloudId, {"deploymentManagers": ocloud1.deploymentManagers})
+    resource_pool_id1 = str(uuid.uuid4())
+    resource_pool1 = ocloud.ResourcePool(
+        resource_pool_id1, "resourcepool1", config.get_api_url(),
+        ocloud1.oCloudId)
+    assert resource_pool_id1 is not None and \
+        resource_pool1.resourcePoolId == resource_pool_id1
+
+
+def test_new_resource():
+    resource_id1 = str(uuid.uuid4())
+    resource_type_id1 = str(uuid.uuid4())
+    resource_pool_id1 = str(uuid.uuid4())
+    resource1 = ocloud.Resource(
+        resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
+    assert resource_id1 is not None and resource1.resourceId == resource_id1
+
+
+def test_new_deployment_manager():
+    ocloud_id1 = str(uuid.uuid4())
+    deployment_manager_id1 = str(uuid.uuid4())
+    deployment_manager1 = ocloud.DeploymentManager(
+        deployment_manager_id1, "k8s1", ocloud_id1,
+        config.get_api_url()+"/k8s1")
+    assert deployment_manager_id1 is not None and deployment_manager1.\
+        deploymentManagerId == deployment_manager_id1
+
+
+def test_new_subscription():
+    subscription_id1 = str(uuid.uuid4())
+    subscription1 = subscription_obj.Subscription(
+        subscription_id1, "https://callback/uri/write/here")
+    assert subscription_id1 is not None and\
+        subscription1.subscriptionId == subscription_id1
+
+
+def test_view_olcouds(mock_uow):
+    session, uow = mock_uow
+
+    ocloud1_UUID = str(uuid.uuid4)
+    ocloud1 = MagicMock()
+    ocloud1.serialize.return_value = {
+        'oCloudId': ocloud1_UUID, 'name': 'ocloud1'}
+    session.return_value.query.return_value = [ocloud1]
+
+    ocloud_list = ocloud_view.oclouds(uow)
+    # assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
+    assert len(ocloud_list) == 1
+
+
+def test_view_olcoud_one(mock_uow):
+    session, uow = mock_uow
+
+    ocloud1_UUID = str(uuid.uuid4)
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = None
+
+    # Query return None
+    ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+    assert ocloud_res is None
+
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = {
+            "oCloudId": ocloud1_UUID}
+
+    ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+    assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
+
+
+def test_view_resource_types(mock_uow):
+    session, uow = mock_uow
+
+    resource_type_id1 = str(uuid.uuid4())
+    restype1 = MagicMock()
+    restype1.serialize.return_value = {
+        "resourceTypeId": resource_type_id1}
+
+    order_by = MagicMock()
+    order_by.count.return_value = 1
+    order_by.limit.return_value.offset.return_value = [restype1]
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+
+    result = ocloud_view.resource_types(uow)
+    assert result['count'] == 1
+    ret_list = result['results']
+    assert str(ret_list[0].get("resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_type_one(mock_uow):
+    session, uow = mock_uow
+
+    resource_type_id1 = str(uuid.uuid4())
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = None
+
+    # Query return None
+    resource_type_res = ocloud_view.resource_type_one(
+        resource_type_id1, uow)
+    assert resource_type_res is None
+
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = {
+            "resourceTypeId": resource_type_id1}
+
+    resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
+    assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_pools(mock_uow):
+    session, uow = mock_uow
+
+    resource_pool_id1 = str(uuid.uuid4())
+    respool1 = MagicMock()
+    respool1.serialize.return_value = {
+        "resourcePoolId": resource_pool_id1}
+
+    order_by = MagicMock()
+    order_by.count.return_value = 1
+    order_by.limit.return_value.offset.return_value = [respool1]
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+
+    result = ocloud_view.resource_pools(uow)
+    assert result['count'] == 1
+    ret_list = result['results']
+    assert str(ret_list[0].get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_pool_one(mock_uow):
+    session, uow = mock_uow
+
+    resource_pool_id1 = str(uuid.uuid4())
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = None
+
+    # Query return None
+    resource_pool_res = ocloud_view.resource_pool_one(
+        resource_pool_id1, uow)
+    assert resource_pool_res is None
+
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = {
+            "resourcePoolId": resource_pool_id1
+        }
+
+    resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
+    assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resources(mock_uow):
+    session, uow = mock_uow
+
+    resource_id1 = str(uuid.uuid4())
+    resource_pool_id1 = str(uuid.uuid4())
+    res1 = MagicMock()
+    res1.serialize.return_value = {
+        "resourceId": resource_id1,
+        "resourcePoolId": resource_pool_id1
+    }
+
+    order_by = MagicMock()
+    order_by.count.return_value = 1
+    order_by.limit.return_value.offset.return_value = [res1]
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+    # TODO: workaround for sqlalchemy not mapping with resource object
+    setattr(ocloud.Resource, 'resourcePoolId', '')
+
+    result = ocloud_view.resources(resource_pool_id1, uow)
+    assert result['count'] == 1
+    resource_list = result['results']
+    assert str(resource_list[0].get("resourceId")) == resource_id1
+    assert str(resource_list[0].get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_one(mock_uow):
+    session, uow = mock_uow
+
+    resource_id1 = str(uuid.uuid4())
+    resource_pool_id1 = str(uuid.uuid4())
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = None
+
+    # Query return None
+    resource_res = ocloud_view.resource_one(resource_id1, uow)
+    assert resource_res is None
+
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = {
+            "resourceId": resource_id1,
+            "resourcePoolId": resource_pool_id1
+        }
+
+    resource_res = ocloud_view.resource_one(resource_id1, uow)
+    assert str(resource_res.get("resourceId")) == resource_id1
+
+
+def test_view_deployment_managers(mock_uow):
+    session, uow = mock_uow
+
+    deployment_manager_id1 = str(uuid.uuid4())
+    dm1 = MagicMock()
+    dm1.serialize.return_value = {
+        "deploymentManagerId": deployment_manager_id1,
+    }
+
+    order_by = MagicMock()
+    order_by.count.return_value = 1
+    order_by.limit.return_value.offset.return_value = [dm1]
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+
+    result = ocloud_view.deployment_managers(uow)
+    assert result['count'] == 1
+    ret_list = result['results']
+    assert str(ret_list[0].get("deploymentManagerId")
+               ) == deployment_manager_id1
+
+
+def test_view_deployment_manager_one(mock_uow):
+    session, uow = mock_uow
+
+    deployment_manager_id1 = str(uuid.uuid4())
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = None
+
+    # Query return None
+    deployment_manager_res = ocloud_view.deployment_manager_one(
+        deployment_manager_id1, uow)
+    assert deployment_manager_res is None
+
+    dms_endpoint = "http://o2:30205/o2dms/v1/uuid"
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = {
+            "deploymentManagerId": deployment_manager_id1,
+            "serviceUri": dms_endpoint,
+            "profile": {}
+        }
+
+    CONF.API.DMS_SUPPORT_PROFILES = 'native_k8sapi,sol018,sol018_helmcli'
+    cluster_endpoint = "https://test_k8s:6443"
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value['profile'] = {
+            "cluster_api_endpoint": cluster_endpoint
+        }
+
+    # profile default
+    deployment_manager_res = ocloud_view.deployment_manager_one(
+        deployment_manager_id1, uow)
+    assert str(deployment_manager_res.get(
+        "deploymentManagerId")) == deployment_manager_id1
+    assert str(deployment_manager_res.get(
+        'serviceUri')) == cluster_endpoint
+    assert deployment_manager_res.get('profile') is None
+
+    # profile sol018
+    profileName = ocloud.DeploymentManagerProfileSOL018
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value['profile'] = {
+            "cluster_api_endpoint": cluster_endpoint
+        }
+    deployment_manager_res = ocloud_view.deployment_manager_one(
+        deployment_manager_id1, uow, profile=profileName)
+    assert str(deployment_manager_res.get(
+        'serviceUri')) == cluster_endpoint
+    assert str(deployment_manager_res.get("extensions").get(
+        "profileName")) == profileName
+
+    # profile wrong name
+    profileName = 'wrong_profile'
+    deployment_manager_res = ocloud_view.deployment_manager_one(
+        deployment_manager_id1, uow, profile=profileName)
+    assert deployment_manager_res == ""
+
+
+def test_view_subscriptions(mock_uow):
+    session, uow = mock_uow
+
+    subscription_id1 = str(uuid.uuid4())
+    sub1 = MagicMock()
+    sub1.serialize.return_value = {
+        "subscriptionId": subscription_id1,
+    }
+
+    order_by = MagicMock()
+    order_by.count.return_value = 1
+    order_by.limit.return_value.offset.return_value = [sub1]
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+
+    result = ocloud_view.subscriptions(uow)
+    assert result['count'] == 1
+    ret_list = result['results']
+    assert str(ret_list[0].get("subscriptionId")) == subscription_id1
+
+
+def test_view_subscription_one(mock_uow):
+    session, uow = mock_uow
+
+    subscription_id1 = str(uuid.uuid4())
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = None
+
+    # Query return None
+    subscription_res = ocloud_view.subscription_one(
+        subscription_id1, uow)
+    assert subscription_res is None
+
+    session.return_value.query.return_value.filter_by.return_value.first.\
+        return_value.serialize.return_value = {
+            "subscriptionId": subscription_id1,
+        }
+
+    subscription_res = ocloud_view.subscription_one(
+        subscription_id1, uow)
+    assert str(subscription_res.get(
+        "subscriptionId")) == subscription_id1
+
+
+def test_flask_get_list(mock_flask_uow):
+    session, app = mock_flask_uow
+    order_by = MagicMock()
+    order_by.count.return_value = 0
+    order_by.limit.return_value.offset.return_value = []
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+    apibase = config.get_o2ims_api_base() + '/v1'
+    # TODO: workaround for sqlalchemy not mapping with resource object
+    setattr(ocloud.Resource, 'resourcePoolId', '')
+
+    with app.test_client() as client:
+        # Get list and return empty list
+        ##########################
+        resp = client.get(apibase+"/resourceTypes")
+        assert resp.get_data() == b'[]\n'
+
+        resp = client.get(apibase+"/resourcePools")
+        assert resp.get_data() == b'[]\n'
+
+        resource_pool_id1 = str(uuid.uuid4())
+        resp = client.get(apibase+"/resourcePools/" +
+                          resource_pool_id1+"/resources")
+        assert resp.get_data() == b'[]\n'
+
+        resp = client.get(apibase+"/deploymentManagers")
+        assert resp.get_data() == b'[]\n'
+
+        resp = client.get(apibase+"/subscriptions")
+        assert resp.get_data() == b'[]\n'
+
+
+def test_flask_get_one(mock_flask_uow):
+    session, app = mock_flask_uow
+
+    session.return_value.query.return_value.filter_by.return_value.\
+        first.return_value = None
+    apibase = config.get_o2ims_api_base() + '/v1'
+
+    with app.test_client() as client:
+        # Get one and return 404
+        ###########################
+        resp = client.get(apibase+"/")
+        assert resp.status_code == 404
+
+        resource_type_id1 = str(uuid.uuid4())
+        resp = client.get(apibase+"/resourceTypes/"+resource_type_id1)
+        assert resp.status_code == 404
+
+        resource_pool_id1 = str(uuid.uuid4())
+        resp = client.get(apibase+"/resourcePools/"+resource_pool_id1)
+        assert resp.status_code == 404
+
+        resource_id1 = str(uuid.uuid4())
+        resp = client.get(apibase+"/resourcePools/" +
+                          resource_pool_id1+"/resources/"+resource_id1)
+        assert resp.status_code == 404
+
+        deployment_manager_id1 = str(uuid.uuid4())
+        resp = client.get(apibase+"/deploymentManagers/" +
+                          deployment_manager_id1)
+        assert resp.status_code == 404
+
+        subscription_id1 = str(uuid.uuid4())
+        resp = client.get(apibase+"/subscriptions/"+subscription_id1)
+        assert resp.status_code == 404
+
+
+def test_flask_post(mock_flask_uow, mappers):
+    session, app = mock_flask_uow
+    apibase = config.get_o2ims_api_base() + '/v1'
+
+    order_by = MagicMock()
+    order_by.count.return_value = 0
+    order_by.limit.return_value.offset.return_value = []
+    session.return_value.query.return_value.filter.return_value.\
+        order_by.return_value = order_by
+
+    with app.test_client() as client:
+        session.return_value.execute.return_value = []
+
+        sub_callback = 'http://subscription/callback/url'
+        resp = client.post(apibase+'/subscriptions', json={
+            'callback': sub_callback,
+            'consumerSubscriptionId': 'consumerSubId1',
+            'filter': '(eq,resourceTypeId,xxx)'
+        })
+        assert resp.status_code == 201
+        assert 'subscriptionId' in resp.get_json()
+
+
+def test_flask_delete(mock_flask_uow):
+    session, app = mock_flask_uow
+    apibase = config.get_o2ims_api_base() + '/v1'
+
+    with app.test_client() as client:
+        session.return_value.execute.return_value.first.return_value = {}
+
+        subscription_id1 = str(uuid.uuid4())
+        resp = client.delete(apibase+"/subscriptions/"+subscription_id1)
+        assert resp.status_code == 200
+
+
+def test_flask_not_allowed(mock_flask_uow):
+    _, app = mock_flask_uow
+    apibase = config.get_o2ims_api_base() + '/v1'
+
+    with app.test_client() as client:
+        # Testing resource type not support method
+        ##########################
+        uri = apibase + "/resourceTypes"
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        resource_type_id1 = str(uuid.uuid4())
+        uri = apibase + "/resourceTypes/" + resource_type_id1
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        # Testing resource pool not support method
+        ##########################
+        uri = apibase + "/resourcePools"
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        resource_pool_id1 = str(uuid.uuid4())
+        uri = apibase + "/resourcePools/" + resource_pool_id1
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        # Testing resource not support method
+        ##########################
+        uri = apibase + "/resourcePools/" + resource_pool_id1 + "/resources"
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        resource_id1 = str(uuid.uuid4())
+        uri = apibase + "/resourcePools/" + \
+            resource_pool_id1 + "/resources/" + resource_id1
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        # Testing deployment managers not support method
+        ##########################
+        uri = apibase + "/deploymentManagers"
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        deployment_manager_id1 = str(uuid.uuid4())
+        uri = apibase + "/deploymentManagers/" + deployment_manager_id1
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        # Testing subscriptions not support method
+        ##########################
+        uri = apibase + "/subscriptions"
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.delete(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+
+        subscription_id1 = str(uuid.uuid4())
+        uri = apibase + "/subscriptions/" + subscription_id1
+        resp = client.post(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.put(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'
+        resp = client.patch(uri)
+        assert resp.status == '405 METHOD NOT ALLOWED'