def _update(self, subscription: ocloud.Subscription):
self.session.add(subscription)
+
+ def _delete(self, subscription_id):
+ self.session.query(ocloud.Subscription).filter_by(
+ subscriptionId=subscription_id).delete()
.ResourceSqlAlchemyRepository(self.session)\r
self.deployment_managers = ocloud_repository\\r
.DeploymentManagerSqlAlchemyRepository(self.session)\r
+ self.subscriptions = ocloud_repository\\r
+ .SubscriptionSqlAlchemyRepository(self.session)\r
self.stxobjects = StxObjectSqlAlchemyRepository(self.session)\r
return super().__enter__()\r
\r
self.resourceTypeId = resourceTypeId\r
self.resourcePoolId = resourcePoolId\r
self.parentId = parentId\r
+ self.path = str()\r
self.elements = elements\r
self.description = description\r
self.extensions = []\r
def update(self, subscription: ocloud.Subscription):\r
self._update(subscription)\r
\r
+ def delete(self, subscription_id):\r
+ self._delete(subscription_id)\r
+\r
@abc.abstractmethod\r
def _add(self, subscription: ocloud.Subscription):\r
raise NotImplementedError\r
@abc.abstractmethod\r
def _update(self, subscription: ocloud.Subscription):\r
raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _delete(self, subscription_id):\r
+ raise NotImplementedError\r
\r
from o2ims import bootstrap\r
# from o2ims import config\r
-# from o2ims.views.ocloud_route import configure_routes\r
from o2ims.views.ocloud_route import configure_namespace\r
\r
\r
O-Cloud Infrastructure Management Services',\r
)\r
bus = bootstrap.bootstrap()\r
-# configure_routes(app, bus)\r
configure_namespace(api, bus)\r
api = Namespace("Ocloud", description='Ocloud related operations.')
- ocloud_list = api.model(
- "List Ocloud object",
+ ocloud = api.model(
+ "OcloudList",
{
'oCloudId': fields.String(required=True),
'globalCloudId': fields.String,
"ResourceType", description='Resource type related operations.')
resource_type_get = api.model(
- "Get ResourceType object",
+ "ResourceTypeGet",
{
'resourceTypeId': fields.String(required=True,
description='Resource type ID'),
"ResourcePool", description='Resource pool related operations.')
resource_pool_get = api.model(
- "Get ResourcePool object",
+ "ResourcePoolGet",
{
'resourcePoolId': fields.String(required=True,
description='Resource pool ID'),
api = Namespace("Resource", description='Resource related operations.')
resource_list = api.model(
- "List Resource object",
+ "ResourceList",
{
'resourceId': fields.String(required=True,
description='Resource ID'),
)
resource_get = api.model(
- "Get Resource object",
+ "ResourceGet",
{
'resourceId': fields.String(required=True,
description='Resource ID'),
description='Deployment manager related operations.')
deployment_manager_get = api.model(
- "Get DeploymentManager object",
+ "DeploymentManagerGet",
{
'deploymentManagerId': fields.String(
required=True,
"Subscription", description='Subscription related operations.')
subscription_get = api.model(
- "Get Subscription object",
+ "SubscriptionGet",
{
'subscriptionId': fields.String(required=True,
description='Subscription ID'),
'filter': fields.String,
}
)
+
+ subscription = api.model(
+ "SubscriptionCreate",
+ {
+ 'callback': fields.String(
+ required=True, description='Subscription callback address'),
+ 'consumerSubscriptionId': fields.String,
+ 'filter': fields.String,
+ }
+ )
+
+ subscription_post_resp = api.model(
+ "SubscriptionCreatedResp",
+ {
+ 'subscriptionId': fields.String(required=True,
+ description='Subscription ID'),
+ }
+ )
# See the License for the specific language governing permissions and
# limitations under the License.
-from flask import jsonify
+# from operator import sub
+import uuid
+# from re import sub
from flask_restx import Resource
from o2ims import config
from o2ims.views import ocloud_view
+from o2ims.domain.ocloud import Subscription
from o2ims.views.ocloud_dto import OcloudDTO, ResourceTypeDTO,\
ResourcePoolDTO, ResourceDTO, DeploymentManagerDTO, SubscriptionDTO
@api_ocloud.route("/")
+@api_ocloud.response(404, 'oCloud not found')
class OcloudsListRouter(Resource):
"""Ocloud get endpoint
O2 interface ocloud endpoint
"""
- ocloud_get = OcloudDTO.ocloud_list
+ ocloud_get = OcloudDTO.ocloud
- @api_ocloud.marshal_list_with(ocloud_get)
+ @api_ocloud.marshal_with(ocloud_get)
def get(self):
- return ocloud_view.oclouds(uow)
+ res = ocloud_view.oclouds(uow)
+ if len(res) > 0:
+ return res[0]
+ api_rt.abort(
+ 404, "oCloud doesn't exist")
# ---------- ResourceTypes ---------- #
model = ResourcePoolDTO.resource_pool_get
@api_rp.doc('Get resource pool')
+ @api_rp.marshal_with(model)
def get(self, resourcePoolID):
result = ocloud_view.resource_pool_one(resourcePoolID, uow)
if result is not None:
model = ResourceDTO.resource_get
@api_res.doc('Get resource')
+ @api_res.marshal_with(model)
def get(self, resourcePoolID, resourceID):
result = ocloud_view.resource_one(resourceID, uow)
if result is not None:
model = DeploymentManagerDTO.deployment_manager_get
@api_dm.doc('Get deployment manager')
+ @api_dm.marshal_with(model)
def get(self, deploymentManagerID):
result = ocloud_view.deployment_manager_one(
deploymentManagerID, uow)
class SubscriptionsListRouter(Resource):
model = SubscriptionDTO.subscription_get
+ expect = SubscriptionDTO.subscription
+ post_resp = SubscriptionDTO.subscription_post_resp
+ @api_sub.doc('List subscriptions')
@api_sub.marshal_list_with(model)
def get(self):
return ocloud_view.subscriptions(uow)
+ @api_sub.doc('Create a subscription')
+ @api_sub.expect(expect)
+ @api_sub.marshal_with(post_resp, code=201)
+ def post(self):
+ data = api_sub.payload
+ sub_uuid = str(uuid.uuid4())
+ subscription = Subscription(
+ sub_uuid, data['callback'], data['consumerSubscriptionId'],
+ data['filter'])
+ ocloud_view.subscription_create(subscription, uow)
+ return {"subscriptionId": sub_uuid}, 201
+
@api_sub.route("/subscriptions/<subscriptionID>")
@api_sub.param('subscriptionID', 'ID of the subscription')
@api_sub.response(404, 'Subscription not found')
-class SubscriptionGetRouter(Resource):
+class SubscriptionGetDelRouter(Resource):
model = DeploymentManagerDTO.deployment_manager_get
@api_sub.doc('Get subscription by ID')
+ @api_sub.marshal_with(model)
def get(self, subscriptionID):
result = ocloud_view.subscription_one(
subscriptionID, uow)
api_sub.abort(404, "Subscription {} doesn't exist".format(
subscriptionID))
+ @api_sub.doc('Delete subscription by ID')
+ @api_sub.response(204, 'Subscription deleted')
+ def delete(self, subscriptionID):
+ with uow:
+ uow.subscriptions.delete(subscriptionID)
+ uow.commit()
+ return '', 204
+
def configure_namespace(app, bus):
# Set global uow
global uow
uow = bus.uow
-
-
-def configure_routes(app, bus):
-
- # ---------- OClouds ---------- #
- @app.route(apibase, methods=["GET"])
- def oclouds():
- result = ocloud_view.oclouds(bus.uow)
- return jsonify(result), 200
-
- # ---------- ResourceTypes ---------- #
-
- @app.route(apibase + "/resourceTypes", methods=["GET"])
- def resource_types():
- result = ocloud_view.resource_types(bus.uow)
- return jsonify(result), 200
-
- @app.route(apibase + "/resourceTypes", methods=["POST", "PUT", "PATCH",
- "DELETE"])
- def resource_types_not_allow():
- return "Method Not Allowed", 405
-
- @app.route(apibase + "/resourceTypes/<resourceTypeID>", methods=["GET"])
- def resource_types_one(resourceTypeID):
- result = ocloud_view.resource_type_one(resourceTypeID, bus.uow)
- if result is None:
- return "", 200
- return jsonify(result), 200
-
- @app.route(apibase + "/resourceTypes/<resourceTypeID>",
- methods=["POST", "PUT", "PATCH", "DELETE"])
- def resource_types_one_not_allow(resourceTypeID):
- return "Method Not Allowed", 405
-
- # ---------- ResourcePools ---------- #
-
- @app.route(apibase + "/resourcePools", methods=["GET"])
- def resource_pools():
- result = ocloud_view.resource_pools(bus.uow)
- return jsonify(result), 200
-
- @app.route(apibase + "/resourcePools", methods=["POST", "PUT", "PATCH",
- "DELETE"])
- def resource_pools_not_allow():
- return "Method Not Allowed", 405
-
- @app.route(apibase + "/resourcePools/<resourcePoolID>", methods=["GET"])
- def resource_pools_one(resourcePoolID):
- result = ocloud_view.resource_pool_one(resourcePoolID, bus.uow)
- if result is None:
- return "", 200
- return jsonify(result), 200
-
- @app.route(apibase + "/resourcePools/<resourcePoolID>",
- methods=["POST", "PUT", "PATCH", "DELETE"])
- def resource_pools_one_not_allow(resourcePoolID):
- return "Method Not Allowed", 405
-
- # ---------- Resources ---------- #
-
- @app.route(apibase + "/resourcePools/<resourcePoolID>/resources",
- methods=["GET"])
- def resources(resourcePoolID):
- result = ocloud_view.resources(resourcePoolID, bus.uow)
- return jsonify(result), 200
-
- @app.route(apibase + "/resourcePools/<resourcePoolID>/resources",
- methods=["POST", "PUT", "PATCH", "DELETE"])
- def resource_not_allow(resourcePoolID):
- return "Method Not Allowed", 405
-
- @app.route(apibase +
- "/resourcePools/<resourcePoolID>/resources/<resourceID>",
- methods=["GET"])
- def resources_one(resourcePoolID, resourceID):
- result = ocloud_view.resource_one(resourceID, bus.uow)
- if result is None:
- return "", 200
- return jsonify(result), 200
-
- @app.route(apibase +
- "/resourcePools/<resourcePoolID>/resources/<resourceID>",
- methods=["POST", "PUT", "PATCH", "DELETE"])
- def resource_one_not_allow(resourcePoolID, resourceID):
- return "Method Not Allowed", 405
-
- # ---------- DeploymentManagers ---------- #
-
- @app.route(apibase + "/deploymentManagers", methods=["GET"])
- def deployment_managers():
- result = ocloud_view.deployment_managers(bus.uow)
- return jsonify(result), 200
-
- @app.route(apibase + "/deploymentManagers",
- methods=["POST", "PUT", "PATCH", "DELETE"])
- def deployment_managers_not_allow():
- return "Method Not Allowed", 405
-
- @app.route(apibase + "/deploymentManagers/<deploymentManagerID>",
- methods=["GET"])
- def deployment_manager_one(deploymentManagerID):
- result = ocloud_view.deployment_manager_one(
- deploymentManagerID, bus.uow)
- if result is None:
- return "", 200
- return jsonify(result), 200
-
- @app.route(apibase + "/deploymentManagers/<deploymentManagerID>",
- methods=["POST", "PUT", "PATCH", "DELETE"])
- def deployment_manager_one_not_allow(deploymentManagerID):
- return "Method Not Allowed", 405
from o2ims.adapter.orm import ocloud, resource, resourcetype, \\r
resourcepool, deploymentmanager, subscription\r
from o2ims.adapter import unit_of_work\r
-# from o2ims.domain.ocloud import Ocloud\r
+from o2ims.domain.ocloud import Subscription\r
\r
\r
def oclouds(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "oCloudId", "name" FROM ocloud\r
- # """,\r
- # )\r
-\r
res = uow.session.execute(select(ocloud))\r
return [dict(r) for r in res]\r
\r
\r
def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "oCloudId", "name" FROM ocloud\r
- # WHERE "oCloudId" = :oCloudId\r
- # """,\r
- # dict(oCloudId=ocloudid),\r
- # )\r
res = uow.session.execute(\r
select(ocloud).where(ocloud.c.oCloudId == ocloudid))\r
first = res.first()\r
\r
def resource_types(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "resourceTypeId", "oCloudId", "name" FROM resourcetype\r
- # """,\r
- # )\r
res = uow.session.execute(select(resourcetype))\r
return [dict(r) for r in res]\r
\r
def resource_type_one(resourceTypeId: str,\r
uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "resourceTypeId", "oCloudId", "name"\r
- # FROM resourcetype WHERE "resourceTypeId" = :resourceTypeId\r
- # """,\r
- # dict(resourceTypeId=resourceTypeId),\r
- # )\r
res = uow.session.execute(select(resourcetype).where(\r
resourcetype.c.resourceTypeId == resourceTypeId))\r
first = res.first()\r
\r
def resource_pools(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "resourcePoolId", "oCloudId", "location", "name"\r
- # FROM resourcepool\r
- # """,\r
- # )\r
res = uow.session.execute(select(resourcepool))\r
return [dict(r) for r in res]\r
\r
def resource_pool_one(resourcePoolId: str,\r
uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "resourcePoolId", "oCloudId", "location", "name"\r
- # FROM resourcepool\r
- # WHERE "resourcePoolId" = :resourcePoolId\r
- # """,\r
- # dict(resourcePoolId=resourcePoolId),\r
- # )\r
res = uow.session.execute(select(resourcepool).where(\r
resourcepool.c.resourcePoolId == resourcePoolId))\r
first = res.first()\r
\r
def resources(resourcePoolId: str, uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "resourceId", "parentId", "resourceTypeId",\r
- # "resourcePoolId", "oCloudId"\r
- # FROM resource\r
- # WHERE "resourcePoolId" = :resourcePoolId\r
- # """,\r
- # dict(resourcePoolId=resourcePoolId),\r
- # )\r
res = uow.session.execute(select(resource).where(\r
resource.c.resourcePoolId == resourcePoolId))\r
return [dict(r) for r in res]\r
\r
def resource_one(resourceId: str, uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "resourceId", "parentId", "resourceTypeId",\r
- # "resourcePoolId", "oCloudId"\r
- # FROM resource\r
- # WHERE "resourceId" = :resourceId\r
- # """,\r
- # # AND "resourcePoolId" = :resourcePoolId\r
- # # dict(resourcePoolId=resourcePoolId,\r
- # dict(resourceId=resourceId),\r
- # )\r
+ # topq = uow.session.query(resource).filter(\r
+ # resource.c.resourceId == resourceId).cte('cte', recursive=True)\r
+ # bootomq = uow.session.query(resource).join(\r
+ # topq, resource.c.parentId == topq.c.resourceId)\r
+ # res = uow.session.query(topq.union(bootomq))\r
+ # print(res)\r
res = uow.session.execute(select(resource).where(\r
resource.c.resourceId == resourceId))\r
first = res.first()\r
\r
def deployment_managers(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "deploymentManagerId", "oCloudId",\r
- # "deploymentManagementServiceEndpoint", "name"\r
- # FROM deploymentmanager\r
- # """,\r
- # )\r
res = uow.session.execute(select(deploymentmanager))\r
return [dict(r) for r in res]\r
\r
def deployment_manager_one(deploymentManagerId: str,\r
uow: unit_of_work.SqlAlchemyUnitOfWork):\r
with uow:\r
- # res = uow.session.execute(\r
- # """\r
- # SELECT "deploymentManagerId", "oCloudId",\r
- # "deploymentManagementServiceEndpoint", "name"\r
- # FROM deploymentmanager\r
- # WHERE "deploymentManagerId" = :deploymentManagerId\r
- # """,\r
- # dict(deploymentManagerId=deploymentManagerId),\r
- # )\r
res = uow.session.execute(select(deploymentmanager).where(\r
deploymentmanager.c.deploymentManagerId == deploymentManagerId))\r
first = res.first()\r
subscription.c.subscriptionId == subscriptionId))\r
first = res.first()\r
return None if first is None else dict(first)\r
+\r
+\r
+def subscription_create(subscription: Subscription,\r
+ uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ uow.subscriptions.add(subscription)\r
+ uow.commit()\r
+\r
+\r
+def subscription_delete(subscriptionId: str,\r
+ uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+ with uow:\r
+ uow.subscriptions.delete(subscriptionId)\r
+ uow.commit()\r
+ return True\r
from o2ims.adapter.orm import metadata, start_o2ims_mappers\r
from o2ims.adapter.clients.orm_stx import start_o2ims_stx_mappers\r
from o2ims.adapter import unit_of_work\r
-# from o2ims.views.ocloud_route import configure_routes\r
from o2ims.views.ocloud_route import configure_namespace\r
from o2ims.bootstrap import bootstrap\r
\r
# app.config["TESTING"] = True\r
api = Api(app)\r
bus = bootstrap(False, uow)\r
- # configure_routes(app, bus)\r
configure_namespace(api, bus)\r
client = app.test_client()\r
return session, client\r
# app.config["TESTING"] = True\r
api = Api(app)\r
bus = bootstrap(False, sqlite_uow)\r
- # configure_routes(app, bus)\r
configure_namespace(api, bus)\r
yield app.test_client()\r
\r
app = Flask(__name__)\r
api = Api(app)\r
bus = bootstrap(False, postgres_uow)\r
- # configure_routes(app, bus)\r
configure_namespace(api, bus)\r
yield app.test_client()\r
\r
resource_type_id1, "resourcetype1", rt.ResourceTypeEnum.PSERVER,
ocloud1.oCloudId)
with sqlite_uow as uow:
- # uow.session.execute()
- uow.oclouds.add(resource_type1)
+ uow.resource_types.add(resource_type1)
uow.commit()
resource_type_list = ocloud_view.resource_types(uow)
assert resource_type_res is None
with sqlite_uow as uow:
- uow.oclouds.add(resource_type1)
+ uow.resource_types.add(resource_type1)
uow.commit()
resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
resource_pool_id1, "resourcepool1", config.get_api_url(),
ocloud1.oCloudId)
with sqlite_uow as uow:
- uow.oclouds.add(resource_pool1)
+ uow.resource_pools.add(resource_pool1)
uow.commit()
resource_pool_list = ocloud_view.resource_pools(uow)
assert resource_pool_res is None
with sqlite_uow as uow:
- uow.oclouds.add(resource_pool1)
+ uow.resource_pools.add(resource_pool1)
uow.commit()
resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
resource1 = ocloud.Resource(
resource_id1, resource_type_id1, resource_pool_id1)
with sqlite_uow as uow:
- uow.oclouds.add(resource1)
+ uow.resources.add(resource1)
uow.commit()
resource_list = ocloud_view.resources(resource_pool_id1, uow)
assert resource_res is None
with sqlite_uow as uow:
- uow.oclouds.add(resource1)
+ uow.resources.add(resource1)
uow.commit()
resource_res = ocloud_view.resource_one(resource_id1, uow)
deployment_manager_id1, "k8s1", ocloud_id1,
config.get_api_url()+"/k8s1")
with sqlite_uow as uow:
- uow.oclouds.add(deployment_manager1)
+ uow.deployment_managers.add(deployment_manager1)
uow.commit()
deployment_manager_list = ocloud_view.deployment_managers(uow)
assert deployment_manager_res is None
with sqlite_uow as uow:
- uow.oclouds.add(deployment_manager1)
+ uow.deployment_managers.add(deployment_manager1)
uow.commit()
deployment_manager_res = ocloud_view.deployment_manager_one(
"deploymentManagerId")) == deployment_manager_id1
-def test_view_subscriptions(mock_uow):
- session, uow = mock_uow
+def test_view_subscriptions(sqlite_uow):
subscription_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value = [{
- "subscriptionId": subscription_id1,
- }]
+ subscription1 = ocloud.Subscription(
+ subscription_id1, "https://callback/uri/write/here")
+ with sqlite_uow as uow:
+ uow.subscriptions.add(subscription1)
+ uow.commit()
- deployment_manager_list = ocloud_view.subscriptions(uow)
- assert str(deployment_manager_list[0].get(
+ subscription_list = ocloud_view.subscriptions(uow)
+ assert str(subscription_list[0].get(
"subscriptionId")) == subscription_id1
-def test_view_subscription_one(mock_uow):
- session, uow = mock_uow
+def test_view_subscription_one(sqlite_uow):
subscription_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value.first.return_value = None
+ subscription1 = ocloud.Subscription(
+ subscription_id1, "https://callback/uri/write/here")
# Query return None
- deployment_manager_res = ocloud_view.subscription_one(
- subscription_id1, uow)
- assert deployment_manager_res is None
+ subscription_res = ocloud_view.subscription_one(
+ subscription_id1, sqlite_uow)
+ assert subscription_res is None
- session.return_value.execute.return_value.first.return_value = {
- "deploymentManagerId": subscription_id1,
- }
+ with sqlite_uow as uow:
+ uow.subscriptions.add(subscription1)
+ uow.commit()
- deployment_manager_res = ocloud_view.subscription_one(
- subscription_id1, uow)
- assert str(deployment_manager_res.get(
- "deploymentManagerId")) == subscription_id1
+ subscription_res = ocloud_view.subscription_one(
+ subscription_id1, sqlite_uow)
+ assert str(subscription_res.get(
+ "subscriptionId")) == subscription_id1
+
+
+def test_view_subscription_delete(sqlite_uow):
+
+ subscription_id1 = str(uuid.uuid4())
+ subscription1 = ocloud.Subscription(
+ subscription_id1, "https://callback/uri/write/here")
+
+ with sqlite_uow as uow:
+ uow.subscriptions.add(subscription1)
+ uow.commit()
+
+ subscription_res = ocloud_view.subscription_one(
+ subscription_id1, sqlite_uow)
+ assert str(subscription_res.get(
+ "subscriptionId")) == subscription_id1
+
+ with sqlite_uow as uow:
+ uow.subscriptions.delete(subscription_id1)
+ uow.commit()
+
+ subscription_res = ocloud_view.subscription_one(
+ subscription_id1, sqlite_uow)
+ assert subscription_res is None
"subscriptionId": subscription_id1,
}]
- deployment_manager_list = ocloud_view.subscriptions(uow)
- assert str(deployment_manager_list[0].get(
+ subscription_list = ocloud_view.subscriptions(uow)
+ assert str(subscription_list[0].get(
"subscriptionId")) == subscription_id1
session.return_value.execute.return_value.first.return_value = None
# Query return None
- deployment_manager_res = ocloud_view.subscription_one(
+ subscription_res = ocloud_view.subscription_one(
subscription_id1, uow)
- assert deployment_manager_res is None
+ assert subscription_res is None
session.return_value.execute.return_value.first.return_value = {
- "deploymentManagerId": subscription_id1,
+ "subscriptionId": subscription_id1,
}
- deployment_manager_res = ocloud_view.subscription_one(
+ subscription_res = ocloud_view.subscription_one(
subscription_id1, uow)
- assert str(deployment_manager_res.get(
- "deploymentManagerId")) == subscription_id1
+ assert str(subscription_res.get(
+ "subscriptionId")) == subscription_id1
def test_flask_get_list(mock_flask_uow):
# Get list and return empty list
##########################
- resp = client.get(apibase+"/")
- assert resp.get_data() == b'[]\n'
-
resp = client.get(apibase+"/resourceTypes")
assert resp.get_data() == b'[]\n'
# Get one and return 404
###########################
+ resp = client.get(apibase+"/")
+ assert resp.status_code == 404
+
resource_type_id1 = str(uuid.uuid4())
resp = client.get(apibase+"/resourceTypes/"+resource_type_id1)
assert resp.status_code == 404
# Testing subscriptions not support method
##########################
uri = apibase + "/subscriptions"
- resp = client.post(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'
resp = client.put(uri)
assert resp.status == '405 METHOD NOT ALLOWED'
resp = client.patch(uri)
assert resp.status == '405 METHOD NOT ALLOWED'
resp = client.patch(uri)
assert resp.status == '405 METHOD NOT ALLOWED'
- resp = client.delete(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'