# from o2ims.adapter import orm
from o2ims.domain import ocloud
from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
- ResourcePoolRepository, ResourceRepository
+ ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository,\
+ SubscriptionRepository
class OcloudSqlAlchemyRepository(OcloudRepository):
def _update(self, resource: ocloud.Resource):
self.session.add(resource)
+
+
+class DeploymentManagerSqlAlchemyRepository(DeploymentManagerRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, deployment_manager: ocloud.DeploymentManager):
+ self.session.add(deployment_manager)
+
+ def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
+ return self.session.query(ocloud.DeploymentManager).filter_by(
+ deploymentManagerId=deployment_manager_id).first()
+
+ def _list(self) -> List[ocloud.DeploymentManager]:
+ return self.session.query()
+
+ def _update(self, deployment_manager: ocloud.DeploymentManager):
+ self.session.add(deployment_manager)
+
+
+class SubscriptionSqlAlchemyRepository(SubscriptionRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, subscription: ocloud.Subscription):
+ self.session.add(subscription)
+
+ def _get(self, subscription_id) -> ocloud.Subscription:
+ return self.session.query(ocloud.Subscription).filter_by(
+ subscriptionId=subscription_id).first()
+
+ def _list(self) -> List[ocloud.Subscription]:
+ return self.session.query()
+
+ def _update(self, subscription: ocloud.Subscription):
+ self.session.add(subscription)
Column("resourceId", String(255), primary_key=True),\r
Column("resourceTypeId", ForeignKey("resourcetype.resourceTypeId")),\r
Column("resourcePoolId", ForeignKey("resourcepool.resourcePoolId")),\r
+ # Column("globalAssetId", String(255)),\r
# Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
Column("parentId", String(255)),\r
Column("description", String(255)),\r
.ResourcePoolSqlAlchemyRepository(self.session)\r
self.resources = ocloud_repository\\r
.ResourceSqlAlchemyRepository(self.session)\r
+ self.deployment_managers = ocloud_repository\\r
+ .DeploymentManagerSqlAlchemyRepository(self.session)\r
self.stxobjects = StxObjectSqlAlchemyRepository(self.session)\r
return super().__enter__()\r
\r
\r
\r
class Subscription(AgRoot):\r
- def __init__(self, id: str, callback: str, consumersubid: list = [],\r
- filter: list = []) -> None:\r
+ def __init__(self, id: str, callback: str, consumersubid: str = '',\r
+ filter: str = '') -> None:\r
self.subscriptionId = id\r
self.callback = callback\r
self.consumerSubscriptionId = consumersubid\r
@abc.abstractmethod\r
def _update(self, resource: ocloud.Resource):\r
raise NotImplementedError\r
+\r
+\r
+class DeploymentManagerRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[ocloud.DeploymentManager]\r
+\r
+ def add(self, deployment_manager: ocloud.DeploymentManager):\r
+ self._add(deployment_manager)\r
+ self.seen.add(deployment_manager)\r
+\r
+ def get(self, deployment_manager_id) -> ocloud.DeploymentManager:\r
+ deployment_manager = self._get(deployment_manager_id)\r
+ if deployment_manager:\r
+ self.seen.add(deployment_manager)\r
+ return deployment_manager\r
+\r
+ def list(self) -> List[ocloud.DeploymentManager]:\r
+ return self._list()\r
+\r
+ def update(self, deployment_manager: ocloud.DeploymentManager):\r
+ self._update(deployment_manager)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, deployment_manager: ocloud.DeploymentManager):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, deployment_manager: ocloud.DeploymentManager):\r
+ raise NotImplementedError\r
+\r
+\r
+class SubscriptionRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[ocloud.Subscription]\r
+\r
+ def add(self, subscription: ocloud.Subscription):\r
+ self._add(subscription)\r
+ self.seen.add(subscription)\r
+\r
+ def get(self, subscription_id) -> ocloud.Subscription:\r
+ subscription = self._get(subscription_id)\r
+ if subscription:\r
+ self.seen.add(subscription)\r
+ return subscription\r
+\r
+ def list(self) -> List[ocloud.Subscription]:\r
+ return self._list()\r
+\r
+ def update(self, subscription: ocloud.Subscription):\r
+ self._update(subscription)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, subscription: ocloud.Subscription):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, subscription_id) -> ocloud.Subscription:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, subscription: ocloud.Subscription):\r
+ raise NotImplementedError\r
# See the License for the specific language governing permissions and\r
# limitations under the License.\r
\r
-# from datetime import datetime\r
from flask import Flask\r
-# request\r
-# from o2ims.domain import commands\r
-# from o2ims.service.handlers import InvalidResourceType\r
+from flask_restx import Api\r
+\r
from o2ims import bootstrap\r
-from o2ims.views.ocloud_route import configure_routes\r
+# from o2ims import config\r
+# from o2ims.views.ocloud_route import configure_routes\r
+from o2ims.views.ocloud_route import configure_namespace\r
\r
\r
+# apibase = config.get_o2ims_api_base()\r
app = Flask(__name__)\r
+api = Api(app, version='1.0.0',\r
+ title='O-Cloud Infrastructure Management Services',\r
+ description='Swagger OpenAPI document for \\r
+ O-Cloud Infrastructure Management Services',\r
+ )\r
bus = bootstrap.bootstrap()\r
-configure_routes(app, bus)\r
+# configure_routes(app, bus)\r
+configure_namespace(api, bus)\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from flask_restx import Namespace, fields
+
+
+class OcloudDTO:
+
+ api = Namespace("Ocloud", description='Ocloud related operations.')
+
+ ocloud_list = api.model(
+ "List Ocloud object",
+ {
+ 'oCloudId': fields.String(required=True),
+ 'globalCloudId': fields.String,
+ 'name': fields.String,
+ 'description': fields.String,
+ 'infrastructureManagementServiceEndpoint': fields.String,
+ }
+ )
+
+
+class ResourceTypeDTO:
+
+ api = Namespace(
+ "ResourceType", description='Resource type related operations.')
+
+ resource_type_get = api.model(
+ "Get ResourceType object",
+ {
+ 'resourceTypeId': fields.String(required=True,
+ description='Resource type ID'),
+ 'name': fields.String,
+ 'vendor': fields.String,
+ 'version': fields.String,
+ 'description': fields.String,
+ }
+ )
+
+
+class ResourcePoolDTO:
+
+ api = Namespace(
+ "ResourcePool", description='Resource pool related operations.')
+
+ resource_pool_get = api.model(
+ "Get ResourcePool object",
+ {
+ 'resourcePoolId': fields.String(required=True,
+ description='Resource pool ID'),
+ 'name': fields.String,
+ 'globalLocationId': fields.String,
+ 'location': fields.String,
+ 'description': fields.String,
+ }
+ )
+
+
+class ResourceDTO:
+
+ api = Namespace("Resource", description='Resource related operations.')
+
+ resource_list = api.model(
+ "List Resource object",
+ {
+ 'resourceId': fields.String(required=True,
+ description='Resource ID'),
+ 'resourceTypeId': fields.String,
+ 'resourcePoolId': fields.String,
+ 'parentId': fields.String,
+ 'description': fields.String,
+ }
+ )
+
+ resource_get = api.model(
+ "Get Resource object",
+ {
+ 'resourceId': fields.String(required=True,
+ description='Resource ID'),
+ 'resourceTypeId': fields.String,
+ 'resourcePoolId': fields.String,
+ 'parentId': fields.String,
+ 'description': fields.String,
+ }
+ )
+
+
+class DeploymentManagerDTO:
+
+ api = Namespace("DeploymentManager",
+ description='Deployment manager related operations.')
+
+ deployment_manager_get = api.model(
+ "Get DeploymentManager object",
+ {
+ 'deploymentManagerId': fields.String(
+ required=True,
+ description='Deployment manager ID'),
+ 'name': fields.String,
+ 'description': fields.String,
+ 'deploymentManagementServiceEndpoint': fields.String,
+ 'supportedLocations': fields.String,
+ 'capabilities': fields.String,
+ 'capacity': fields.String,
+ }
+ )
+
+
+class SubscriptionDTO:
+
+ api = Namespace(
+ "Subscription", description='Subscription related operations.')
+
+ subscription_get = api.model(
+ "Get Subscription object",
+ {
+ 'subscriptionId': fields.String(required=True,
+ description='Subscription ID'),
+ 'callback': fields.String,
+ 'consumerSubscriptionId': fields.String,
+ 'filter': fields.String,
+ }
+ )
# limitations under the License.
from flask import jsonify
+from flask_restx import Resource
from o2ims import config
from o2ims.views import ocloud_view
-
+from o2ims.views.ocloud_dto import OcloudDTO, ResourceTypeDTO,\
+ ResourcePoolDTO, ResourceDTO, DeploymentManagerDTO, SubscriptionDTO
apibase = config.get_o2ims_api_base()
+# api = Namespace("O2IMS", description='IMS')
+
+
+# ---------- OClouds ---------- #
+api_ocloud = OcloudDTO.api
+
+
+@api_ocloud.route("/")
+class OcloudsListRouter(Resource):
+ """Ocloud get endpoint
+ O2 interface ocloud endpoint
+ """
+
+ ocloud_get = OcloudDTO.ocloud_list
+
+ @api_ocloud.marshal_list_with(ocloud_get)
+ def get(self):
+ return ocloud_view.oclouds(uow)
+
+
+# ---------- ResourceTypes ---------- #
+api_rt = ResourceTypeDTO.api
+
+
+@api_rt.route("/resourceTypes")
+class ResourceTypesListRouter(Resource):
+
+ model = ResourceTypeDTO.resource_type_get
+
+ @api_rt.marshal_list_with(model)
+ def get(self):
+ return ocloud_view.resource_types(uow)
+
+ # @api.doc(response={405: 'Method Not Allowed'})
+ # def post(self):
+ # api.abort(405)
+
+ # @api.doc(response={405: 'Method Not Allowed'})
+ # def put(self):
+ # api.abort(405)
+
+ # @api.doc(response={405: 'Method Not Allowed'})
+ # def patch(self):
+ # api.abort(405)
+
+ # @api.doc(response={405: 'Method Not Allowed'})
+ # def delete(self):
+ # api.abort(405)
+
+
+@api_rt.route("/resourceTypes/<resourceTypeID>")
+@api_rt.param('resourceTypeID', 'ID of the resource type')
+@api_rt.response(404, 'Resource type not found')
+class ResourceTypeGetRouter(Resource):
+
+ model = ResourceTypeDTO.resource_type_get
+
+ @api_rt.doc('Get resource type')
+ @api_rt.marshal_with(model)
+ def get(self, resourceTypeID):
+ result = ocloud_view.resource_type_one(resourceTypeID, uow)
+ if result is not None:
+ return result
+ api_rt.abort(
+ 404, "Resource type {} doesn't exist".format(resourceTypeID))
+
+
+# ---------- ResourcePools ---------- #
+api_rp = ResourcePoolDTO.api
+
+
+@api_rp.route("/resourcePools")
+class ResourcePoolsListRouter(Resource):
+
+ model = ResourcePoolDTO.resource_pool_get
+
+ @api_rp.marshal_list_with(model)
+ def get(self):
+ return ocloud_view.resource_pools(uow)
+
+
+@api_rp.route("/resourcePools/<resourcePoolID>")
+@api_rp.param('resourcePoolID', 'ID of the resource pool')
+@api_rp.response(404, 'Resource pool not found')
+class ResourcePoolGetRouter(Resource):
+
+ model = ResourcePoolDTO.resource_pool_get
+
+ @api_rp.doc('Get resource pool')
+ def get(self, resourcePoolID):
+ result = ocloud_view.resource_pool_one(resourcePoolID, uow)
+ if result is not None:
+ return result
+ api_rp.abort(
+ 404, "Resource pool {} doesn't exist".format(resourcePoolID))
+
+
+# ---------- Resources ---------- #
+api_res = ResourceDTO.api
+
+
+@api_res.route("/resourcePools/<resourcePoolID>/resources")
+@api_res.param('resourcePoolID', 'ID of the resource pool')
+class ResourcesListRouter(Resource):
+
+ model = ResourceDTO.resource_list
+
+ @api_res.marshal_list_with(model)
+ def get(self, resourcePoolID):
+ return ocloud_view.resources(resourcePoolID, uow)
+
+
+@api_res.route("/resourcePools/<resourcePoolID>/resources/<resourceID>")
+@api_res.param('resourcePoolID', 'ID of the resource pool')
+@api_res.param('resourceID', 'ID of the resource')
+@api_res.response(404, 'Resource not found')
+class ResourceGetRouter(Resource):
+
+ model = ResourceDTO.resource_get
+
+ @api_res.doc('Get resource')
+ def get(self, resourcePoolID, resourceID):
+ result = ocloud_view.resource_one(resourceID, uow)
+ if result is not None:
+ return result
+ api_res.abort(404, "Resource {} doesn't exist".format(resourceID))
+
+
+# ---------- DeploymentManagers ---------- #
+api_dm = DeploymentManagerDTO.api
+
+
+@api_dm.route("/deploymentManagers")
+class DeploymentManagersListRouter(Resource):
+
+ model = DeploymentManagerDTO.deployment_manager_get
+
+ @api_dm.marshal_list_with(model)
+ def get(self):
+ return ocloud_view.deployment_managers(uow)
+
+
+@api_dm.route("/deploymentManagers/<deploymentManagerID>")
+@api_dm.param('deploymentManagerID', 'ID of the deployment manager')
+@api_dm.response(404, 'Deployment manager not found')
+class DeploymentManagerGetRouter(Resource):
+
+ model = DeploymentManagerDTO.deployment_manager_get
+
+ @api_dm.doc('Get deployment manager')
+ def get(self, deploymentManagerID):
+ result = ocloud_view.deployment_manager_one(
+ deploymentManagerID, uow)
+ if result is not None:
+ return result
+ api_dm.abort(404, "Deployment manager {} doesn't exist".format(
+ deploymentManagerID))
+
+
+# ---------- Subscriptions ---------- #
+api_sub = SubscriptionDTO.api
+
+
+@api_sub.route("/subscriptions")
+class SubscriptionsListRouter(Resource):
+
+ model = SubscriptionDTO.subscription_get
+
+ @api_sub.marshal_list_with(model)
+ def get(self):
+ return ocloud_view.subscriptions(uow)
+
+
+@api_sub.route("/subscriptions/<subscriptionID>")
+@api_sub.param('subscriptionID', 'ID of the subscription')
+@api_sub.response(404, 'Subscription not found')
+class SubscriptionGetRouter(Resource):
+
+ model = DeploymentManagerDTO.deployment_manager_get
+
+ @api_sub.doc('Get subscription by ID')
+ def get(self, subscriptionID):
+ result = ocloud_view.subscription_one(
+ subscriptionID, uow)
+ if result is not None:
+ return result
+ api_sub.abort(404, "Subscription {} doesn't exist".format(
+ subscriptionID))
+
+
+def configure_namespace(app, bus):
+
+ app.add_namespace(api_ocloud, path=apibase)
+ app.add_namespace(api_rt, path=apibase)
+ app.add_namespace(api_rp, path=apibase)
+ app.add_namespace(api_res, path=apibase)
+ app.add_namespace(api_dm, path=apibase)
+ app.add_namespace(api_sub, path=apibase)
+
+ # Set global uow
+ global uow
+ uow = bus.uow
def configure_routes(app, bus):
\r
from sqlalchemy import select\r
\r
-from o2ims.adapter.orm import ocloud, resource, \\r
- resourcetype, resourcepool, deploymentmanager\r
+from o2ims.adapter.orm import ocloud, resource, resourcetype, \\r
+ resourcepool, deploymentmanager, subscription\r
from o2ims.adapter import unit_of_work\r
# from o2ims.domain.ocloud import Ocloud\r
\r
deploymentmanager.c.deploymentManagerId == deploymentManagerId))\r
first = res.first()\r
return None if first is None else dict(first)\r
+\r
+\r
+def subscriptions(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ res = uow.session.execute(select(subscription))\r
+ return [dict(r) for r in res]\r
+\r
+\r
+def subscription_one(subscriptionId: str,\r
+ uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ res = uow.session.execute(select(subscription).where(\r
+ subscription.c.subscriptionId == subscriptionId))\r
+ first = res.first()\r
+ return None if first is None else dict(first)\r
flask\r
+flask-restx\r
sqlalchemy\r
redis\r
psycopg2-binary\r
import redis\r
import requests\r
from flask import Flask\r
+from flask_restx import Api\r
from sqlalchemy import create_engine\r
from sqlalchemy.orm import sessionmaker, clear_mappers\r
from tenacity import retry, stop_after_delay\r
from o2ims.adapter.orm import metadata, start_o2ims_mappers\r
from o2ims.adapter.clients.orm_stx import start_o2ims_stx_mappers\r
from o2ims.adapter import unit_of_work\r
-from o2ims.views.ocloud_route import configure_routes\r
+# from o2ims.views.ocloud_route import configure_routes\r
+from o2ims.views.ocloud_route import configure_namespace\r
from o2ims.bootstrap import bootstrap\r
\r
\r
session, uow = mock_uow\r
app = Flask(__name__)\r
# app.config["TESTING"] = True\r
+ api = Api(app)\r
bus = bootstrap(False, uow)\r
- configure_routes(app, bus)\r
+ # configure_routes(app, bus)\r
+ configure_namespace(api, bus)\r
client = app.test_client()\r
return session, client\r
\r
def sqlite_flask_uow(sqlite_uow):\r
app = Flask(__name__)\r
# app.config["TESTING"] = True\r
+ api = Api(app)\r
bus = bootstrap(False, sqlite_uow)\r
- configure_routes(app, bus)\r
+ # configure_routes(app, bus)\r
+ configure_namespace(api, bus)\r
yield app.test_client()\r
\r
\r
@pytest.fixture\r
def postgres_flask_uow(postgres_uow):\r
app = Flask(__name__)\r
+ api = Api(app)\r
bus = bootstrap(False, postgres_uow)\r
- configure_routes(app, bus)\r
+ # configure_routes(app, bus)\r
+ configure_namespace(api, bus)\r
yield app.test_client()\r
\r
\r
resource_id1, resource_type_id1, resource_pool_id1)
repo.add(resource1)
assert repo.get(resource_id1) == resource1
+
+
+def test_add_deployment_manager(sqlite_session_factory):
+ session = sqlite_session_factory()
+ repo = repository.DeploymentManagerSqlAlchemyRepository(session)
+ ocloud_id1 = str(uuid.uuid4())
+ deployment_manager_id1 = str(uuid.uuid4())
+ deployment_manager1 = ocloud.DeploymentManager(
+ deployment_manager_id1, "k8s1", ocloud_id1,
+ config.get_api_url()+"/k8s1")
+ repo.add(deployment_manager1)
+ assert repo.get(deployment_manager_id1) == deployment_manager1
+
+
+def test_add_subscription(sqlite_session_factory):
+ session = sqlite_session_factory()
+ repo = repository.SubscriptionSqlAlchemyRepository(session)
+ subscription_id1 = str(uuid.uuid4())
+ subscription1 = ocloud.Subscription(
+ subscription_id1, "https://callback/uri/write/here")
+ repo.add(subscription1)
+ assert repo.get(subscription_id1) == subscription1
resource_res = ocloud_view.resource_one(resource_id1, uow)
assert str(resource_res.get("resourceId")) == resource_id1
+
+
+def test_view_deployment_managers(sqlite_uow):
+ ocloud_id1 = str(uuid.uuid4())
+ deployment_manager_id1 = str(uuid.uuid4())
+ deployment_manager1 = ocloud.DeploymentManager(
+ deployment_manager_id1, "k8s1", ocloud_id1,
+ config.get_api_url()+"/k8s1")
+ with sqlite_uow as uow:
+ uow.oclouds.add(deployment_manager1)
+ uow.commit()
+
+ deployment_manager_list = ocloud_view.deployment_managers(uow)
+ assert str(deployment_manager_list[0].get(
+ "deploymentManagerId")) == deployment_manager_id1
+
+
+def test_view_deployment_manager_one(sqlite_uow):
+ ocloud_id1 = str(uuid.uuid4())
+ deployment_manager_id1 = str(uuid.uuid4())
+ deployment_manager1 = ocloud.DeploymentManager(
+ deployment_manager_id1, "k8s1", ocloud_id1,
+ config.get_api_url()+"/k8s1")
+
+ # Query return None
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, sqlite_uow)
+ assert deployment_manager_res is None
+
+ with sqlite_uow as uow:
+ uow.oclouds.add(deployment_manager1)
+ uow.commit()
+
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, sqlite_uow)
+ assert str(deployment_manager_res.get(
+ "deploymentManagerId")) == deployment_manager_id1
+
+
+def test_view_subscriptions(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value = [{
+ "subscriptionId": subscription_id1,
+ }]
+
+ deployment_manager_list = ocloud_view.subscriptions(uow)
+ assert str(deployment_manager_list[0].get(
+ "subscriptionId")) == subscription_id1
+
+
+def test_view_subscription_one(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ deployment_manager_res = ocloud_view.subscription_one(
+ subscription_id1, uow)
+ assert deployment_manager_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "deploymentManagerId": subscription_id1,
+ }
+
+ deployment_manager_res = ocloud_view.subscription_one(
+ subscription_id1, uow)
+ assert str(deployment_manager_res.get(
+ "deploymentManagerId")) == subscription_id1
assert resource_id1 is not None and resource1.resourceId == resource_id1
+def test_new_deployment_manager():
+ ocloud_id1 = str(uuid.uuid4())
+ deployment_manager_id1 = str(uuid.uuid4())
+ deployment_manager1 = ocloud.DeploymentManager(
+ deployment_manager_id1, "k8s1", ocloud_id1,
+ config.get_api_url()+"/k8s1")
+ assert deployment_manager_id1 is not None and deployment_manager1.\
+ deploymentManagerId == deployment_manager_id1
+
+
+def test_new_subscription():
+ subscription_id1 = str(uuid.uuid4())
+ subscription1 = ocloud.Subscription(
+ subscription_id1, "https://callback/uri/write/here")
+ assert subscription_id1 is not None and\
+ subscription1.subscriptionId == subscription_id1
+
+
def test_view_olcouds(mock_uow):
session, uow = mock_uow
assert str(resource_res.get("resourceId")) == resource_id1
+def test_view_deployment_managers(mock_uow):
+ session, uow = mock_uow
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value = [{
+ "deploymentManagerId": deployment_manager_id1,
+ }]
+
+ deployment_manager_list = ocloud_view.deployment_managers(uow)
+ assert str(deployment_manager_list[0].get(
+ "deploymentManagerId")) == deployment_manager_id1
+
+
+def test_view_deployment_manager_one(mock_uow):
+ session, uow = mock_uow
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, uow)
+ assert deployment_manager_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "deploymentManagerId": deployment_manager_id1,
+ }
+
+ deployment_manager_res = ocloud_view.deployment_manager_one(
+ deployment_manager_id1, uow)
+ assert str(deployment_manager_res.get(
+ "deploymentManagerId")) == deployment_manager_id1
+
+
+def test_view_subscriptions(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value = [{
+ "subscriptionId": subscription_id1,
+ }]
+
+ deployment_manager_list = ocloud_view.subscriptions(uow)
+ assert str(deployment_manager_list[0].get(
+ "subscriptionId")) == subscription_id1
+
+
+def test_view_subscription_one(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ deployment_manager_res = ocloud_view.subscription_one(
+ subscription_id1, uow)
+ assert deployment_manager_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "deploymentManagerId": subscription_id1,
+ }
+
+ deployment_manager_res = ocloud_view.subscription_one(
+ subscription_id1, uow)
+ assert str(deployment_manager_res.get(
+ "deploymentManagerId")) == subscription_id1
+
+
def test_flask_get_list(mock_flask_uow):
session, client = mock_flask_uow
session.return_value.execute.return_value = []
# Get list and return empty list
##########################
- resp = client.get(apibase)
+ resp = client.get(apibase+"/")
assert resp.get_data() == b'[]\n'
resp = client.get(apibase+"/resourceTypes")
session.return_value.execute.return_value.first.return_value = None
apibase = config.get_o2ims_api_base()
- # Get one and return nothing
+ # Get one and return 404
###########################
resource_type_id1 = str(uuid.uuid4())
resp = client.get(apibase+"/resourceTypes/"+resource_type_id1)
- assert resp.get_data() == b''
+ assert resp.status_code == 404
resource_pool_id1 = str(uuid.uuid4())
resp = client.get(apibase+"/resourcePools/"+resource_pool_id1)
- assert resp.get_data() == b''
+ assert resp.status_code == 404
resource_id1 = str(uuid.uuid4())
resp = client.get(apibase+"/resourcePools/" +
resource_pool_id1+"/resources/"+resource_id1)
- assert resp.get_data() == b''
+ assert resp.status_code == 404
deployment_manager_id1 = str(uuid.uuid4())
resp = client.get(apibase+"/deploymentManagers/"+deployment_manager_id1)
- assert resp.get_data() == b''
+ assert resp.status_code == 404
+
+ subscription_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/subscriptions/"+subscription_id1)
+ assert resp.status_code == 404
def test_flask_not_allowed(mock_flask_uow):
assert resp.status == '405 METHOD NOT ALLOWED'
resp = client.delete(uri)
assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing subscriptions not support method
+ ##########################
+ uri = apibase + "/subscriptions"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ subscription_id1 = str(uuid.uuid4())
+ uri = apibase + "/subscriptions/" + subscription_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'