from typing import List
# from o2ims.adapter import orm
from o2ims.domain import ocloud
-from o2ims.domain.ocloud_repo import OcloudRepository
+from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
+ ResourcePoolRepository, ResourceRepository
class OcloudSqlAlchemyRepository(OcloudRepository):
self.session.add(ocloud)
# self.session.add_all(ocloud.deploymentManagers)
- def _get(self, ocloudid) -> ocloud.Ocloud:
+ def _get(self, ocloud_id) -> ocloud.Ocloud:
return self.session.query(ocloud.Ocloud).filter_by(
- oCloudId=ocloudid).first()
+ oCloudId=ocloud_id).first()
def _list(self) -> List[ocloud.Ocloud]:
return self.session.query(ocloud.Ocloud).order_by(
def _update(self, ocloud: ocloud.Ocloud):
self.session.add(ocloud)
+
+
+class ResouceTypeSqlAlchemyRepository(ResourceTypeRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, resourceType: ocloud.ResourceType):
+ self.session.add(resourceType)
+
+ def _get(self, resource_type_id) -> ocloud.ResourceType:
+ return self.session.query(ocloud.ResourceType).filter_by(
+ resourceTypeId=resource_type_id).first()
+
+ def _list(self) -> List[ocloud.ResourceType]:
+ return self.session.query()
+
+ def _update(self, resourceType: ocloud.ResourceType):
+ self.session.add(resourceType)
+
+
+class ResourcePoolSqlAlchemyRepository(ResourcePoolRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, resourcePool: ocloud.ResourcePool):
+ self.session.add(resourcePool)
+
+ def _get(self, resource_pool_id) -> ocloud.ResourcePool:
+ return self.session.query(ocloud.ResourcePool).filter_by(
+ resourcePoolId=resource_pool_id).first()
+
+ def _list(self) -> List[ocloud.ResourcePool]:
+ return self.session.query()
+
+ def _update(self, resourcePool: ocloud.ResourcePool):
+ self.session.add(resourcePool)
+
+
+class ResourceSqlAlchemyRepository(ResourceRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, resource: ocloud.Resource):
+ self.session.add(resource)
+
+ def _get(self, resource_id) -> ocloud.Resource:
+ return self.session.query(ocloud.Resource).filter_by(
+ resourceId=resource_id).first()
+
+ def _list(self) -> List[ocloud.Resource]:
+ return self.session.query()
+
+ def _update(self, resource: ocloud.Resource):
+ self.session.add(resource)
String,\r
# Date,\r
ForeignKey,\r
+ # engine,\r
# event,\r
)\r
\r
from sqlalchemy.orm import mapper, relationship\r
-# from sqlalchemy.sql.expression import true\r
\r
from o2ims.domain import ocloud as ocloudModel\r
\r
"ocloud",\r
metadata,\r
Column("oCloudId", String(255), primary_key=True),\r
+ Column("globalcloudId", String(255)),\r
Column("name", String(255)),\r
Column("description", String(255)),\r
Column("infrastructureManagementServiceEndpoint", String(255))\r
+ # Column("extensions", String(1024))\r
)\r
\r
-resourcepool = Table(\r
- "resourcepool",\r
+resourcetype = Table(\r
+ "resourcetype",\r
metadata,\r
- Column("resourcePoolId", String(255), primary_key=True),\r
- Column("name", String(255)),\r
- Column("location", String(255)),\r
+ Column("resourceTypeId", String(255), primary_key=True),\r
Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
+ Column("name", String(255)),\r
+ Column("vendor", String(255)),\r
+ Column("model", String(255)),\r
+ Column("version", String(255)),\r
+ Column("description", String(255)),\r
# Column("extensions", String(1024))\r
)\r
\r
-resourcetype = Table(\r
- "resourcetype",\r
+resourcepool = Table(\r
+ "resourcepool",\r
metadata,\r
- Column("resourceTypeId", String(255), primary_key=True),\r
+ Column("resourcePoolId", String(255), primary_key=True),\r
Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
+ Column("globalLocationId", String(255)),\r
Column("name", String(255)),\r
+ Column("location", String(255)),\r
+ Column("description", String(255)),\r
+ # Column("resources", String(1024))\r
+ # Column("extensions", String(1024))\r
)\r
\r
resource = Table(\r
"resource",\r
metadata,\r
Column("resourceId", String(255), primary_key=True),\r
- Column("parentId", String(255)),\r
Column("resourceTypeId", ForeignKey("resourcetype.resourceTypeId")),\r
Column("resourcePoolId", ForeignKey("resourcepool.resourcePoolId")),\r
- Column("oCloudId", ForeignKey("ocloud.oCloudId"))\r
+ # Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
+ Column("parentId", String(255)),\r
+ Column("description", String(255)),\r
+ # Column("elements", String(255))\r
+ # Column("extensions", String(1024))\r
)\r
\r
deploymentmanager = Table(\r
"deploymentmanager",\r
metadata,\r
Column("deploymentManagerId", String(255), primary_key=True),\r
+ Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
Column("name", String(255)),\r
+ Column("description", String(255)),\r
Column("deploymentManagementServiceEndpoint", String(255)),\r
- Column("oCloudId", ForeignKey("ocloud.oCloudId"))\r
+ Column("supportedLocations", String(255)),\r
+ Column("capabilities", String(255)),\r
+ Column("capacity", String(255)),\r
+ # Column("extensions", String(1024))\r
+)\r
+\r
+subscription = Table(\r
+ "subscription",\r
+ metadata,\r
+ Column("subscriptionId", String(255), primary_key=True),\r
+ Column("callback", String(255)),\r
+ Column("consumerSubscriptionId", String(255)),\r
+ Column("filter", String(255)),\r
)\r
\r
\r
-def start_o2ims_mappers():\r
+def start_o2ims_mappers(engine=None):\r
logger.info("Starting O2 IMS mappers")\r
+\r
dm_mapper = mapper(ocloudModel.DeploymentManager, deploymentmanager)\r
resourcepool_mapper = mapper(ocloudModel.ResourcePool, resourcepool)\r
resourcetype_mapper = mapper(ocloudModel.ResourceType, resourcetype)\r
- # resource_mapper = mapper(ocloudModel.Resource, resource)\r
mapper(\r
ocloudModel.Ocloud,\r
ocloud,\r
"resourceTypes": relationship(resourcetype_mapper),\r
"resourcePools": relationship(resourcepool_mapper)\r
})\r
+ mapper(\r
+ ocloudModel.Resource,\r
+ resource,\r
+ properties={\r
+ "resourceTypes": relationship(resourcetype_mapper),\r
+ "resourcePools": relationship(resourcepool_mapper)\r
+ }\r
+ )\r
+ mapper(ocloudModel.Subscription, subscription)\r
+\r
+ if engine is not None:\r
+ metadata.create_all(engine)\r
from sqlalchemy.orm.session import Session\r
\r
from o2ims import config\r
-from o2ims.adapter.ocloud_repository import OcloudSqlAlchemyRepository\r
+from o2ims.adapter import ocloud_repository\r
from o2ims.adapter.stx_repository import StxObjectSqlAlchemyRepository\r
from o2ims.service.unit_of_work import AbstractUnitOfWork\r
\r
\r
def __enter__(self):\r
self.session = self.session_factory() # type: Session\r
- self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+ self.oclouds = ocloud_repository\\r
+ .OcloudSqlAlchemyRepository(self.session)\r
+ self.resource_types = ocloud_repository\\r
+ .ResouceTypeSqlAlchemyRepository(self.session)\r
+ self.resource_pools = ocloud_repository\\r
+ .ResourcePoolSqlAlchemyRepository(self.session)\r
+ self.resources = ocloud_repository\\r
+ .ResourceSqlAlchemyRepository(self.session)\r
self.stxobjects = StxObjectSqlAlchemyRepository(self.session)\r
return super().__enter__()\r
\r
notifications = SmoO2Notifications()
if start_orm:
- orm.start_o2ims_mappers()
orm_stx.start_o2ims_stx_mappers(uow)
+ with uow:
+ # get default engine if uow is by default
+ engine = uow.session.get_bind()
+ orm.start_o2ims_mappers(engine)
dependencies = {"uow": uow, "notifications": notifications,
"publish": publish}
# from uuid import UUID\r
\r
\r
+class Subscription:\r
+ def __init__(self, id: str, callback: str, consumersubid: list = [],\r
+ filter: list = []) -> None:\r
+ self.subscriptionId = id\r
+ self.callback = callback\r
+ self.consumerSubscriptionId = consumersubid\r
+ self.filter = filter\r
+\r
+\r
class DeploymentManager:\r
def __init__(self, id: str, name: str, ocloudid: str,\r
- dmsendpoint: str) -> None:\r
+ dmsendpoint: str, description: str = '',\r
+ supportedLocations: str = '', capabilities: str = '',\r
+ capacity: str = '') -> None:\r
self.deploymentManagerId = id\r
- self.name = name\r
self.oCloudId = ocloudid\r
+ self.name = name\r
+ self.description = description\r
self.deploymentManagementServiceEndpoint = dmsendpoint\r
+ self.supportedLocations = supportedLocations\r
+ self.capabilities = capabilities\r
+ self.capacity = capacity\r
self.extensions = []\r
\r
\r
class ResourcePool:\r
def __init__(self, id: str, name: str, location: str,\r
- ocloudid: str) -> None:\r
+ ocloudid: str, gLocationId: str = '',\r
+ description: str = '') -> None:\r
self.resourcePoolId = id\r
+ self.oCloudId = ocloudid\r
+ self.globalLocationId = gLocationId\r
self.name = name\r
self.location = location\r
- self.oCloudId = ocloudid\r
+ self.description = description\r
self.extensions = []\r
\r
\r
class ResourceType:\r
def __init__(self, typeid: str, name: str, typeEnum: ResourceTypeEnum,\r
- ocloudid: str) -> None:\r
+ ocloudid: str, vender: str = '', model: str = '',\r
+ version: str = '',\r
+ description: str = '') -> None:\r
self.resourceTypeId = typeid\r
+ self.oCloudId = ocloudid\r
self.resourceTypeEnum = typeEnum.value\r
self.name = name\r
- self.oCloudId = ocloudid\r
+ self.vender = vender\r
+ self.model = model\r
+ self.version = version\r
+ self.description = description\r
self.extensions = []\r
\r
\r
class Resource:\r
def __init__(self, resourceId: str, resourceTypeId: str,\r
- resourcePoolId: str) -> None:\r
+ resourcePoolId: str, oCloudId: str = '',\r
+ parentId: str = '', elements: list = [],\r
+ description: str = '') -> None:\r
self.resourceId = resourceId\r
- self.oCloudId = None # tbd\r
+ self.oCloudId = oCloudId\r
self.resourceTypeId = resourceTypeId\r
self.resourcePoolId = resourcePoolId\r
- self.parentId = None\r
- self.elements = []\r
+ self.parentId = parentId\r
+ self.elements = elements\r
+ self.description = description\r
self.extensions = []\r
\r
\r
class Ocloud:\r
def __init__(self, ocloudid: str, name: str, imsendpoint: str,\r
+ globalcloudId: str = '',\r
description: str = '', version_number: int = 0) -> None:\r
\r
self.oCloudId = ocloudid\r
+ self.globalcloudId = globalcloudId\r
self.version_number = version_number\r
self.name = name\r
self.description = description\r
self._add(ocloud)\r
self.seen.add(ocloud)\r
\r
- def get(self, ocloudid) -> ocloud.Ocloud:\r
- ocloud = self._get(ocloudid)\r
+ def get(self, ocloud_id) -> ocloud.Ocloud:\r
+ ocloud = self._get(ocloud_id)\r
if ocloud:\r
self.seen.add(ocloud)\r
return ocloud\r
raise NotImplementedError\r
\r
@abc.abstractmethod\r
- def _get(self, ocloudid) -> ocloud.Ocloud:\r
+ def _get(self, ocloud_id) -> ocloud.Ocloud:\r
raise NotImplementedError\r
\r
@abc.abstractmethod\r
def _update(self, ocloud: ocloud.Ocloud):\r
raise NotImplementedError\r
+\r
+\r
+class ResourceTypeRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[ocloud.ResourceType]\r
+\r
+ def add(self, resource_type: ocloud.ResourceType):\r
+ self._add(resource_type)\r
+ self.seen.add(resource_type)\r
+\r
+ def get(self, resource_type_id) -> ocloud.ResourceType:\r
+ resource_type = self._get(resource_type_id)\r
+ if resource_type:\r
+ self.seen.add(resource_type)\r
+ return resource_type\r
+\r
+ def list(self) -> List[ocloud.ResourceType]:\r
+ return self._list()\r
+\r
+ def update(self, resource_type: ocloud.ResourceType):\r
+ self._update(resource_type)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, resource_type: ocloud.ResourceType):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, resource_type_id) -> ocloud.ResourceType:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, resource_type: ocloud.ResourceType):\r
+ raise NotImplementedError\r
+\r
+\r
+class ResourcePoolRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[ocloud.ResourcePool]\r
+\r
+ def add(self, resource_pool: ocloud.ResourcePool):\r
+ self._add(resource_pool)\r
+ self.seen.add(resource_pool)\r
+\r
+ def get(self, resource_pool_id) -> ocloud.ResourcePool:\r
+ resource_pool = self._get(resource_pool_id)\r
+ if resource_pool:\r
+ self.seen.add(resource_pool)\r
+ return resource_pool\r
+\r
+ def list(self) -> List[ocloud.ResourcePool]:\r
+ return self._list()\r
+\r
+ def update(self, resource_pool: ocloud.ResourcePool):\r
+ self._update(resource_pool)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, resource_pool: ocloud.ResourcePool):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, resource_pool_id) -> ocloud.ResourcePool:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, resource_pool: ocloud.ResourcePool):\r
+ raise NotImplementedError\r
+\r
+\r
+class ResourceRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[ocloud.Resource]\r
+\r
+ def add(self, resource: ocloud.Resource):\r
+ self._add(resource)\r
+ self.seen.add(resource)\r
+\r
+ def get(self, resource_id) -> ocloud.Resource:\r
+ resource = self._get(resource_id)\r
+ if resource:\r
+ self.seen.add(resource)\r
+ return resource\r
+\r
+ def list(self) -> List[ocloud.Resource]:\r
+ return self._list()\r
+\r
+ def update(self, resource: ocloud.Resource):\r
+ self._update(resource)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, resource: ocloud.Resource):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, resource_id) -> ocloud.Resource:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, resource: ocloud.Resource):\r
+ raise NotImplementedError\r
# limitations under the License.\r
\r
# from datetime import datetime\r
-from flask import Flask, jsonify\r
+from flask import Flask\r
# request\r
# from o2ims.domain import commands\r
# from o2ims.service.handlers import InvalidResourceType\r
-from o2ims import bootstrap, config\r
-from o2ims.views import ocloud_view\r
+from o2ims import bootstrap\r
+from o2ims.views.ocloud_route import configure_routes\r
+\r
\r
app = Flask(__name__)\r
bus = bootstrap.bootstrap()\r
-apibase = config.get_o2ims_api_base()\r
-\r
-\r
-@app.route(apibase, methods=["GET"])\r
-def oclouds():\r
- result = ocloud_view.oclouds(bus.uow)\r
- return jsonify(result), 200\r
+configure_routes(app, bus)\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from flask import jsonify
+
+from o2ims import config
+from o2ims.views import ocloud_view
+
+
+apibase = config.get_o2ims_api_base()
+
+
+def configure_routes(app, bus):
+
+ # ---------- OClouds ---------- #
+ @app.route(apibase, methods=["GET"])
+ def oclouds():
+ result = ocloud_view.oclouds(bus.uow)
+ return jsonify(result), 200
+
+ # ---------- ResourceTypes ---------- #
+
+ @app.route(apibase + "/resourceTypes", methods=["GET"])
+ def resource_types():
+ result = ocloud_view.resource_types(bus.uow)
+ return jsonify(result), 200
+
+ @app.route(apibase + "/resourceTypes", methods=["POST", "PUT", "PATCH",
+ "DELETE"])
+ def resource_types_not_allow():
+ return "Method Not Allowed", 405
+
+ @app.route(apibase + "/resourceTypes/<resourceTypeID>", methods=["GET"])
+ def resource_types_one(resourceTypeID):
+ result = ocloud_view.resource_type_one(resourceTypeID, bus.uow)
+ if result is None:
+ return "", 200
+ return jsonify(result), 200
+
+ @app.route(apibase + "/resourceTypes/<resourceTypeID>",
+ methods=["POST", "PUT", "PATCH", "DELETE"])
+ def resource_types_one_not_allow(resourceTypeID):
+ return "Method Not Allowed", 405
+
+ # ---------- ResourcePools ---------- #
+
+ @app.route(apibase + "/resourcePools", methods=["GET"])
+ def resource_pools():
+ result = ocloud_view.resource_pools(bus.uow)
+ return jsonify(result), 200
+
+ @app.route(apibase + "/resourcePools", methods=["POST", "PUT", "PATCH",
+ "DELETE"])
+ def resource_pools_not_allow():
+ return "Method Not Allowed", 405
+
+ @app.route(apibase + "/resourcePools/<resourcePoolID>", methods=["GET"])
+ def resource_pools_one(resourcePoolID):
+ result = ocloud_view.resource_pool_one(resourcePoolID, bus.uow)
+ if result is None:
+ return "", 200
+ return jsonify(result), 200
+
+ @app.route(apibase + "/resourcePools/<resourcePoolID>",
+ methods=["POST", "PUT", "PATCH", "DELETE"])
+ def resource_pools_one_not_allow(resourcePoolID):
+ return "Method Not Allowed", 405
+
+ # ---------- Resources ---------- #
+
+ @app.route(apibase + "/resourcePools/<resourcePoolID>/resources",
+ methods=["GET"])
+ def resources(resourcePoolID):
+ result = ocloud_view.resources(resourcePoolID, bus.uow)
+ return jsonify(result), 200
+
+ @app.route(apibase + "/resourcePools/<resourcePoolID>/resources",
+ methods=["POST", "PUT", "PATCH", "DELETE"])
+ def resource_not_allow(resourcePoolID):
+ return "Method Not Allowed", 405
+
+ @app.route(apibase +
+ "/resourcePools/<resourcePoolID>/resources/<resourceID>",
+ methods=["GET"])
+ def resources_one(resourcePoolID, resourceID):
+ result = ocloud_view.resource_one(resourceID, bus.uow)
+ if result is None:
+ return "", 200
+ return jsonify(result), 200
+
+ @app.route(apibase +
+ "/resourcePools/<resourcePoolID>/resources/<resourceID>",
+ methods=["POST", "PUT", "PATCH", "DELETE"])
+ def resource_one_not_allow(resourcePoolID, resourceID):
+ return "Method Not Allowed", 405
+
+ # ---------- DeploymentManagers ---------- #
+
+ @app.route(apibase + "/deploymentManagers", methods=["GET"])
+ def deployment_managers():
+ result = ocloud_view.deployment_managers(bus.uow)
+ return jsonify(result), 200
+
+ @app.route(apibase + "/deploymentManagers",
+ methods=["POST", "PUT", "PATCH", "DELETE"])
+ def deployment_managers_not_allow():
+ return "Method Not Allowed", 405
+
+ @app.route(apibase + "/deploymentManagers/<deploymentManagerID>",
+ methods=["GET"])
+ def deployment_manager_one(deploymentManagerID):
+ result = ocloud_view.deployment_manager_one(
+ deploymentManagerID, bus.uow)
+ if result is None:
+ return "", 200
+ return jsonify(result), 200
+
+ @app.route(apibase + "/deploymentManagers/<deploymentManagerID>",
+ methods=["POST", "PUT", "PATCH", "DELETE"])
+ def deployment_manager_one_not_allow(deploymentManagerID):
+ return "Method Not Allowed", 405
# See the License for the specific language governing permissions and\r
# limitations under the License.\r
\r
-from o2ims.service import unit_of_work\r
+from sqlalchemy import select\r
+\r
+from o2ims.adapter.orm import ocloud, resource, \\r
+ resourcetype, resourcepool, deploymentmanager\r
+from o2ims.adapter import unit_of_work\r
+# from o2ims.domain.ocloud import Ocloud\r
+\r
+\r
+def oclouds(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "oCloudId", "name" FROM ocloud\r
+ # """,\r
+ # )\r
+\r
+ res = uow.session.execute(select(ocloud))\r
+ return [dict(r) for r in res]\r
\r
\r
def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- results = uow.session.execute(\r
- """\r
- SELECT oCloudId, name FROM ocloud WHERE oCloudId = :ocloudid\r
- """,\r
- dict(ocloudid=ocloudid),\r
- )\r
- return dict(results[0]) if len(results) > 0 else None\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "oCloudId", "name" FROM ocloud\r
+ # WHERE "oCloudId" = :oCloudId\r
+ # """,\r
+ # dict(oCloudId=ocloudid),\r
+ # )\r
+ res = uow.session.execute(\r
+ select(ocloud).where(ocloud.c.oCloudId == ocloudid))\r
+ first = res.first()\r
+ return None if first is None else dict(first)\r
+\r
+\r
+def resource_types(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "resourceTypeId", "oCloudId", "name" FROM resourcetype\r
+ # """,\r
+ # )\r
+ res = uow.session.execute(select(resourcetype))\r
+ return [dict(r) for r in res]\r
+\r
+\r
+def resource_type_one(resourceTypeId: str,\r
+ uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "resourceTypeId", "oCloudId", "name"\r
+ # FROM resourcetype WHERE "resourceTypeId" = :resourceTypeId\r
+ # """,\r
+ # dict(resourceTypeId=resourceTypeId),\r
+ # )\r
+ res = uow.session.execute(select(resourcetype).where(\r
+ resourcetype.c.resourceTypeId == resourceTypeId))\r
+ first = res.first()\r
+ return None if first is None else dict(first)\r
+\r
+\r
+def resource_pools(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "resourcePoolId", "oCloudId", "location", "name"\r
+ # FROM resourcepool\r
+ # """,\r
+ # )\r
+ res = uow.session.execute(select(resourcepool))\r
+ return [dict(r) for r in res]\r
+\r
+\r
+def resource_pool_one(resourcePoolId: str,\r
+ uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "resourcePoolId", "oCloudId", "location", "name"\r
+ # FROM resourcepool\r
+ # WHERE "resourcePoolId" = :resourcePoolId\r
+ # """,\r
+ # dict(resourcePoolId=resourcePoolId),\r
+ # )\r
+ res = uow.session.execute(select(resourcepool).where(\r
+ resourcepool.c.resourcePoolId == resourcePoolId))\r
+ first = res.first()\r
+ return None if first is None else dict(first)\r
+\r
+\r
+def resources(resourcePoolId: str, uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "resourceId", "parentId", "resourceTypeId",\r
+ # "resourcePoolId", "oCloudId"\r
+ # FROM resource\r
+ # WHERE "resourcePoolId" = :resourcePoolId\r
+ # """,\r
+ # dict(resourcePoolId=resourcePoolId),\r
+ # )\r
+ res = uow.session.execute(select(resource).where(\r
+ resource.c.resourcePoolId == resourcePoolId))\r
+ return [dict(r) for r in res]\r
+\r
+\r
+def resource_one(resourceId: str, uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "resourceId", "parentId", "resourceTypeId",\r
+ # "resourcePoolId", "oCloudId"\r
+ # FROM resource\r
+ # WHERE "resourceId" = :resourceId\r
+ # """,\r
+ # # AND "resourcePoolId" = :resourcePoolId\r
+ # # dict(resourcePoolId=resourcePoolId,\r
+ # dict(resourceId=resourceId),\r
+ # )\r
+ res = uow.session.execute(select(resource).where(\r
+ resource.c.resourceId == resourceId))\r
+ first = res.first()\r
+ return None if first is None else dict(first)\r
+\r
+\r
+def deployment_managers(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "deploymentManagerId", "oCloudId",\r
+ # "deploymentManagementServiceEndpoint", "name"\r
+ # FROM deploymentmanager\r
+ # """,\r
+ # )\r
+ res = uow.session.execute(select(deploymentmanager))\r
+ return [dict(r) for r in res]\r
\r
\r
-def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
+def deployment_manager_one(deploymentManagerId: str,\r
+ uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- results = uow.session.execute(\r
- """\r
- SELECT oCloudId, name FROM ocloud\r
- """,\r
- )\r
- return [dict(r) for r in results]\r
+ # res = uow.session.execute(\r
+ # """\r
+ # SELECT "deploymentManagerId", "oCloudId",\r
+ # "deploymentManagementServiceEndpoint", "name"\r
+ # FROM deploymentmanager\r
+ # WHERE "deploymentManagerId" = :deploymentManagerId\r
+ # """,\r
+ # dict(deploymentManagerId=deploymentManagerId),\r
+ # )\r
+ res = uow.session.execute(select(deploymentmanager).where(\r
+ deploymentmanager.c.deploymentManagerId == deploymentManagerId))\r
+ first = res.first()\r
+ return None if first is None else dict(first)\r
\r
pytest\r
pytest-icdiff\r
+mock\r
\r
tenacity\r
\r
import pytest\r
import redis\r
import requests\r
+from flask import Flask\r
from sqlalchemy import create_engine\r
from sqlalchemy.orm import sessionmaker, clear_mappers\r
from tenacity import retry, stop_after_delay\r
+from unittest.mock import MagicMock\r
\r
+from o2ims import config\r
from o2ims.adapter.orm import metadata, start_o2ims_mappers\r
from o2ims.adapter.clients.orm_stx import start_o2ims_stx_mappers\r
-from o2ims import config\r
-from o2ims.domain import stx_object as ocloudModel\r
+from o2ims.adapter import unit_of_work\r
+from o2ims.views.ocloud_route import configure_routes\r
+from o2ims.bootstrap import bootstrap\r
+\r
+\r
+@pytest.fixture\r
+def mock_uow():\r
+ session = MagicMock()\r
+ uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory=session)\r
+ return session, uow\r
+\r
+\r
+@pytest.fixture\r
+def mock_flask_uow(mock_uow):\r
+ session, uow = mock_uow\r
+ app = Flask(__name__)\r
+ # app.config["TESTING"] = True\r
+ bus = bootstrap(False, uow)\r
+ configure_routes(app, bus)\r
+ client = app.test_client()\r
+ return session, client\r
\r
\r
@pytest.fixture\r
yield sessionmaker(bind=in_memory_sqlite_db)\r
\r
\r
+@pytest.fixture\r
+def sqlite_uow(sqlite_session_factory):\r
+ uow = unit_of_work.SqlAlchemyUnitOfWork(\r
+ session_factory=sqlite_session_factory)\r
+ # with uow:\r
+ # start_o2ims_mappers(uow.session.get_bind())\r
+ # uow.commit()\r
+ yield uow\r
+ # clear_mappers()\r
+ with uow:\r
+ engine = uow.session.get_bind()\r
+ metadata.drop_all(engine)\r
+\r
+\r
+@pytest.fixture\r
+def sqlite_flask_uow(sqlite_uow):\r
+ app = Flask(__name__)\r
+ # app.config["TESTING"] = True\r
+ bus = bootstrap(False, sqlite_uow)\r
+ configure_routes(app, bus)\r
+ yield app.test_client()\r
+\r
+\r
@pytest.fixture\r
def mappers():\r
start_o2ims_mappers()\r
\r
@pytest.fixture(scope="session")\r
def postgres_db():\r
- engine = create_engine(config.get_postgres_uri(), isolation_level="SERIALIZABLE")\r
+ engine = create_engine(config.get_postgres_uri(),\r
+ isolation_level="SERIALIZABLE")\r
wait_for_postgres_to_come_up(engine)\r
metadata.create_all(engine)\r
return engine\r
return postgres_session_factory()\r
\r
\r
+@pytest.fixture\r
+def postgres_uow(postgres_session_factory):\r
+ uow = unit_of_work.SqlAlchemyUnitOfWork(\r
+ session_factory=postgres_session_factory)\r
+ yield uow\r
+\r
+\r
+@pytest.fixture\r
+def postgres_flask_uow(postgres_uow):\r
+ app = Flask(__name__)\r
+ bus = bootstrap(False, postgres_uow)\r
+ configure_routes(app, bus)\r
+ yield app.test_client()\r
+\r
+\r
@pytest.fixture\r
def restart_api():\r
- (Path(__file__).parent / "../src/o2ims/entrypoints/flask_application.py").touch()\r
+ (Path(__file__).parent / "../src/o2ims/entrypoints/flask_application.py")\\r
+ .touch()\r
time.sleep(0.5)\r
wait_for_webapp_to_come_up()\r
\r
# See the License for the specific language governing permissions and
# limitations under the License.
+import uuid
import pytest
+
+from o2ims.domain import resource_type as rt
from o2ims.adapter import ocloud_repository as repository
from o2ims.domain import ocloud
from o2ims import config
-import uuid
pytestmark = pytest.mark.usefixtures("mappers")
def setup_ocloud():
ocloudid1 = str(uuid.uuid4())
- ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud 1 for integration test", 1)
+ ocloud1 = ocloud.Ocloud(
+ ocloudid1, "ocloud1", config.get_api_url(),
+ "ocloud 1 for integration test", 1)
return ocloud1
+
def setup_ocloud_and_save(sqlite_session_factory):
session = sqlite_session_factory()
repo = repository.OcloudSqlAlchemyRepository(session)
ocloudid1 = str(uuid.uuid4())
- ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud for integration test", 1)
+ ocloud1 = ocloud.Ocloud(
+ ocloudid1, "ocloud1", config.get_api_url(),
+ "ocloud for integration test", 1)
repo.add(ocloud1)
assert repo.get(ocloudid1) == ocloud1
session.flush()
return ocloud1
+
def test_add_ocloud(sqlite_session_factory):
session = sqlite_session_factory()
repo = repository.OcloudSqlAlchemyRepository(session)
ocloudid1 = str(uuid.uuid4())
- ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud for integration test", 1)
+ ocloud1 = ocloud.Ocloud(
+ ocloudid1, "ocloud1", config.get_api_url(),
+ "ocloud for integration test", 1)
repo.add(ocloud1)
assert repo.get(ocloudid1) == ocloud1
+
def test_get_ocloud(sqlite_session_factory):
ocloud1 = setup_ocloud_and_save(sqlite_session_factory)
session = sqlite_session_factory()
ocloud2 = repo.get(ocloud1.oCloudId)
assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
+
def test_add_ocloud_with_dms(sqlite_session_factory):
session = sqlite_session_factory()
repo = repository.OcloudSqlAlchemyRepository(session)
dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
ocloud1.addDeploymentManager(dms)
repo.update(ocloud1)
- # repo.update(ocloud1.oCloudId, {"deploymentManagers": ocloud1.deploymentManagers})
+ # repo.update(ocloud1.oCloudId, {"deploymentManagers":
+ # ocloud1.deploymentManagers})
session.flush()
# seperate session to confirm ocloud is updated into repo
assert ocloud2 is not None
assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
assert len(ocloud2.deploymentManagers) == 1
+
+
+def test_add_resource_type(sqlite_session_factory):
+ session = sqlite_session_factory()
+ repo = repository.ResouceTypeSqlAlchemyRepository(session)
+ ocloud1_id = str(uuid.uuid4())
+ resource_type_id1 = str(uuid.uuid4())
+ resource_type1 = ocloud.ResourceType(
+ resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
+ ocloud1_id)
+ repo.add(resource_type1)
+ assert repo.get(resource_type_id1) == resource_type1
+
+
+def test_add_resource_pool(sqlite_session_factory):
+ session = sqlite_session_factory()
+ repo = repository.ResourcePoolSqlAlchemyRepository(session)
+ ocloud1_id = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ resource_pool1 = ocloud.ResourcePool(
+ resource_pool_id1, "resourcepool1", config.get_api_url(),
+ ocloud1_id)
+ repo.add(resource_pool1)
+ assert repo.get(resource_pool_id1) == resource_pool1
+
+
+def test_add_resource(sqlite_session_factory):
+ session = sqlite_session_factory()
+ repo = repository.ResourceSqlAlchemyRepository(session)
+ resource_id1 = str(uuid.uuid4())
+ resource_type_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ resource1 = ocloud.Resource(
+ resource_id1, resource_type_id1, resource_pool_id1)
+ repo.add(resource1)
+ assert repo.get(resource_id1) == resource1
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+import pytest
+
+from o2ims import config
+from o2ims.views import ocloud_view
+from o2ims.domain import ocloud
+from o2ims.domain import resource_type as rt
+
+
+pytestmark = pytest.mark.usefixtures("mappers")
+
+
+def setup_ocloud():
+ ocloudid1 = str(uuid.uuid4())
+ ocloud1 = ocloud.Ocloud(
+ ocloudid1, "ocloud1", config.get_api_url(),
+ "ocloud 1 for integration test", 1)
+ return ocloud1
+
+
+def test_view_olcouds(sqlite_uow):
+ ocloud1 = setup_ocloud()
+ ocloud1_UUID = ocloud1.oCloudId
+ with sqlite_uow as uow:
+ uow.oclouds.add(ocloud1)
+ uow.commit()
+
+ ocloud_list = ocloud_view.oclouds(uow)
+ assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
+
+
+def test_view_olcoud_one(sqlite_uow):
+ ocloud1 = setup_ocloud()
+ ocloud1_UUID = ocloud1.oCloudId
+
+ # Query return None
+ ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, sqlite_uow)
+ assert ocloud_res is None
+
+ with sqlite_uow as uow:
+ uow.oclouds.add(ocloud1)
+ # INSERT INTO ocloud (oCloudId, name) VALUES (ocloud1_UUID, 'ocloud1')
+ uow.commit()
+ ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+ assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
+
+
+def test_view_resource_types(sqlite_uow):
+ ocloud1 = setup_ocloud()
+ resource_type_id1 = str(uuid.uuid4())
+ resource_type1 = ocloud.ResourceType(
+ resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
+ ocloud1.oCloudId)
+ with sqlite_uow as uow:
+ # uow.session.execute()
+ uow.oclouds.add(resource_type1)
+ uow.commit()
+
+ resource_type_list = ocloud_view.resource_types(uow)
+ assert str(resource_type_list[0].get(
+ "resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_type_one(sqlite_uow):
+ ocloud1 = setup_ocloud()
+ resource_type_id1 = str(uuid.uuid4())
+ resource_type1 = ocloud.ResourceType(
+ resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
+ ocloud1.oCloudId)
+
+ # Query return None
+ resource_type_res = ocloud_view.resource_type_one(
+ resource_type_id1, sqlite_uow)
+ assert resource_type_res is None
+
+ with sqlite_uow as uow:
+ uow.oclouds.add(resource_type1)
+ uow.commit()
+ resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
+ assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_pools(sqlite_uow):
+ ocloud1 = setup_ocloud()
+ resource_pool_id1 = str(uuid.uuid4())
+ resource_pool1 = ocloud.ResourcePool(
+ resource_pool_id1, "resourcepool1", config.get_api_url(),
+ ocloud1.oCloudId)
+ with sqlite_uow as uow:
+ uow.oclouds.add(resource_pool1)
+ uow.commit()
+
+ resource_pool_list = ocloud_view.resource_pools(uow)
+ assert str(resource_pool_list[0].get(
+ "resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_pool_one(sqlite_uow):
+ ocloud1 = setup_ocloud()
+ resource_pool_id1 = str(uuid.uuid4())
+ resource_pool1 = ocloud.ResourcePool(
+ resource_pool_id1, "resourcepool1", config.get_api_url(),
+ ocloud1.oCloudId)
+
+ # Query return None
+ resource_pool_res = ocloud_view.resource_pool_one(
+ resource_pool_id1, sqlite_uow)
+ assert resource_pool_res is None
+
+ with sqlite_uow as uow:
+ uow.oclouds.add(resource_pool1)
+ uow.commit()
+ resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
+ assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resources(sqlite_uow):
+ resource_id1 = str(uuid.uuid4())
+ resource_type_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ resource1 = ocloud.Resource(
+ resource_id1, resource_type_id1, resource_pool_id1)
+ with sqlite_uow as uow:
+ uow.oclouds.add(resource1)
+ uow.commit()
+
+ resource_list = ocloud_view.resources(resource_pool_id1, uow)
+ assert str(resource_list[0].get("resourceId")) == resource_id1
+
+
+def test_view_resource_one(sqlite_uow):
+ resource_id1 = str(uuid.uuid4())
+ resource_type_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ resource1 = ocloud.Resource(
+ resource_id1, resource_type_id1, resource_pool_id1)
+
+ # Query return None
+ resource_res = ocloud_view.resource_one(resource_id1, sqlite_uow)
+ assert resource_res is None
+
+ with sqlite_uow as uow:
+ uow.oclouds.add(resource1)
+ uow.commit()
+
+ resource_res = ocloud_view.resource_one(resource_id1, uow)
+ assert str(resource_res.get("resourceId")) == resource_id1
# See the License for the specific language governing permissions and
# limitations under the License.
+import uuid
+
from o2ims.domain import ocloud
+from o2ims.domain import resource_type as rt
+from o2ims.views import ocloud_view
from o2ims import config
-import uuid
def setup_ocloud():
ocloudid1 = str(uuid.uuid4())
- ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud for unit test", 1)
+ ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1",
+ config.get_api_url(), "ocloud for unit test", 1)
return ocloud1
+
def test_new_ocloud():
ocloudid1 = str(uuid.uuid4())
- ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1", config.get_api_url(), "ocloud for unit test", 1)
+ ocloud1 = ocloud.Ocloud(ocloudid1, "ocloud1",
+ config.get_api_url(), "ocloud for unit test", 1)
assert ocloudid1 is not None and ocloud1.oCloudId == ocloudid1
+
def test_add_ocloud_with_dms():
ocloud1 = setup_ocloud()
dmsid = str(uuid.uuid4())
ocloud1.addDeploymentManager(dms)
ocloud1.addDeploymentManager(dms)
assert len(ocloud1.deploymentManagers) == 1
- # repo.update(ocloud1.oCloudId, {"deploymentManagers": ocloud1.deploymentManagers})
+ # repo.update(ocloud1.oCloudId, {
+ # "deploymentManagers": ocloud1.deploymentManagers})
+
+
+def test_new_resource_type():
+ ocloud1 = setup_ocloud()
+ resource_type_id1 = str(uuid.uuid4())
+ resource_type1 = ocloud.ResourceType(
+ resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
+ ocloud1.oCloudId)
+ assert resource_type_id1 is not None and \
+ resource_type1.resourceTypeId == resource_type_id1
+
+
+def test_new_resource_pool():
+ ocloud1 = setup_ocloud()
+ resource_pool_id1 = str(uuid.uuid4())
+ resource_pool1 = ocloud.ResourcePool(
+ resource_pool_id1, "resourcepool1", config.get_api_url(),
+ ocloud1.oCloudId)
+ assert resource_pool_id1 is not None and \
+ resource_pool1.resourcePoolId == resource_pool_id1
+
+
+def test_new_resource():
+ resource_id1 = str(uuid.uuid4())
+ resource_type_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ resource1 = ocloud.Resource(
+ resource_id1, resource_type_id1, resource_pool_id1)
+ assert resource_id1 is not None and resource1.resourceId == resource_id1
+
+
+def test_view_olcouds(mock_uow):
+ session, uow = mock_uow
+
+ ocloud1_UUID = str(uuid.uuid4)
+ session.return_value.execute.return_value = [
+ {"oCloudId": ocloud1_UUID}]
+
+ ocloud_list = ocloud_view.oclouds(uow)
+ assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
+
+
+def test_view_olcoud_one(mock_uow):
+ session, uow = mock_uow
+
+ ocloud1_UUID = str(uuid.uuid4)
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+ assert ocloud_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "oCloudId": ocloud1_UUID}
+
+ ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
+ assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
+
+
+def test_view_resource_types(mock_uow):
+ session, uow = mock_uow
+
+ resource_type_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value = [
+ {"resourceTypeId": resource_type_id1}
+ ]
+
+ resource_type_list = ocloud_view.resource_types(uow)
+ assert str(resource_type_list[0].get(
+ "resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_type_one(mock_uow):
+ session, uow = mock_uow
+
+ resource_type_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ resource_type_res = ocloud_view.resource_type_one(
+ resource_type_id1, uow)
+ assert resource_type_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "resourceTypeId": resource_type_id1}
+
+ resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
+ assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
+
+
+def test_view_resource_pools(mock_uow):
+ session, uow = mock_uow
+
+ resource_pool_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value = [
+ {"resourcePoolId": resource_pool_id1}
+ ]
+
+ resource_pool_list = ocloud_view.resource_pools(uow)
+ assert str(resource_pool_list[0].get(
+ "resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_pool_one(mock_uow):
+ session, uow = mock_uow
+
+ resource_pool_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ resource_pool_res = ocloud_view.resource_pool_one(
+ resource_pool_id1, uow)
+ assert resource_pool_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "resourcePoolId": resource_pool_id1
+ }
+
+ resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
+ assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resources(mock_uow):
+ session, uow = mock_uow
+
+ resource_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value = [{
+ "resourceId": resource_id1,
+ "resourcePoolId": resource_pool_id1
+ }]
+
+ resource_list = ocloud_view.resources(resource_pool_id1, uow)
+ assert str(resource_list[0].get("resourceId")) == resource_id1
+ assert str(resource_list[0].get("resourcePoolId")) == resource_pool_id1
+
+
+def test_view_resource_one(mock_uow):
+ session, uow = mock_uow
+
+ resource_id1 = str(uuid.uuid4())
+ resource_pool_id1 = str(uuid.uuid4())
+ session.return_value.execute.return_value.first.return_value = None
+
+ # Query return None
+ resource_res = ocloud_view.resource_one(resource_id1, uow)
+ assert resource_res is None
+
+ session.return_value.execute.return_value.first.return_value = {
+ "resourceId": resource_id1,
+ "resourcePoolId": resource_pool_id1
+ }
+
+ resource_res = ocloud_view.resource_one(resource_id1, uow)
+ assert str(resource_res.get("resourceId")) == resource_id1
+
+
+def test_flask_get_list(mock_flask_uow):
+ session, client = mock_flask_uow
+ session.return_value.execute.return_value = []
+ apibase = config.get_o2ims_api_base()
+
+ # Get list and return empty list
+ ##########################
+ resp = client.get(apibase)
+ assert resp.get_data() == b'[]\n'
+
+ resp = client.get(apibase+"/resourceTypes")
+ assert resp.get_data() == b'[]\n'
+
+ resp = client.get(apibase+"/resourcePools")
+ assert resp.get_data() == b'[]\n'
+
+ resource_pool_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourcePools/"+resource_pool_id1+"/resources")
+ assert resp.get_data() == b'[]\n'
+
+ resp = client.get(apibase+"/deploymentManagers")
+ assert resp.get_data() == b'[]\n'
+
+
+def test_flask_get_one(mock_flask_uow):
+ session, client = mock_flask_uow
+ session.return_value.execute.return_value.first.return_value = None
+ apibase = config.get_o2ims_api_base()
+
+ # Get one and return nothing
+ ###########################
+ resource_type_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourceTypes/"+resource_type_id1)
+ assert resp.get_data() == b''
+
+ resource_pool_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourcePools/"+resource_pool_id1)
+ assert resp.get_data() == b''
+
+ resource_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/resourcePools/" +
+ resource_pool_id1+"/resources/"+resource_id1)
+ assert resp.get_data() == b''
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/deploymentManagers/"+deployment_manager_id1)
+ assert resp.get_data() == b''
+
+
+def test_flask_not_allowed(mock_flask_uow):
+ _, client = mock_flask_uow
+ apibase = config.get_o2ims_api_base()
+
+ # Testing resource type not support method
+ ##########################
+ uri = apibase + "/resourceTypes"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ resource_type_id1 = str(uuid.uuid4())
+ uri = apibase + "/resourceTypes/" + resource_type_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing resource pool not support method
+ ##########################
+ uri = apibase + "/resourcePools"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ resource_pool_id1 = str(uuid.uuid4())
+ uri = apibase + "/resourcePools/" + resource_pool_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing resource not support method
+ ##########################
+ uri = apibase + "/resourcePools/" + resource_pool_id1 + "/resources"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ resource_id1 = str(uuid.uuid4())
+ uri = apibase + "/resourcePools/" + \
+ resource_pool_id1 + "/resources/" + resource_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ # Testing deployment managers not support method
+ ##########################
+ uri = apibase + "/deploymentManagers"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+
+ deployment_manager_id1 = str(uuid.uuid4())
+ uri = apibase + "/deploymentManagers/" + deployment_manager_id1
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'