from o2common.config import config
from o2common.views.view import gen_filter, check_filter
from o2common.views.pagination_view import Pagination
-from o2common.views.route_exception import BadRequestException
+from o2common.views.route_exception import BadRequestException, \
+ NotFoundException
from o2ims.domain import ocloud
from o2ims.views.ocloud_dto import SubscriptionDTO
uow: unit_of_work.AbstractUnitOfWork):
with uow:
first = uow.resource_pools.get(resourcePoolId)
- return first.serialize() if first is not None else None
+ return first.serialize() if first else None
def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork,
**kwargs):
+ with uow:
+ first = uow.resource_pools.get(resourcePoolId)
+ if first is None:
+ raise NotFoundException("ResourcePool {} doesn't exist".format(
+ resourcePoolId))
pagination = Pagination(**kwargs)
# filter key should be the same with database name
query_kwargs = pagination.get_pagination()
return pagination.get_result(ret)
-def resource_one(resourceId: str, uow: unit_of_work.AbstractUnitOfWork):
+def resource_one(resourceId: str,
+ uow: unit_of_work.AbstractUnitOfWork, resourcePoolId: str):
with uow:
- first = uow.resources.get(resourceId)
- return first.serialize() if first is not None else None
+ resoucePool = uow.resource_pools.get(resourcePoolId)
+ if resoucePool is None:
+ raise NotFoundException("ResourcePool {} doesn't exist".format(
+ resourcePoolId))
+
+ first = uow.resources.get(resourceId)
+ if first is None:
+ raise NotFoundException("Resource {} doesn't exist".format(
+ resourceId))
+ return first.serialize()
def deployment_managers(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
args.append(getattr(Subscription, 'filter') == filter)
args.append(getattr(Subscription,
'consumerSubscriptionId') == consumer_subs_id)
- count, _ = uow.alarm_subscriptions.list_with_count(*args)
+ count, _ = uow.subscriptions.list_with_count(*args)
if count > 0:
raise BadRequestException("The value of parameters is duplicated")
uow.subscriptions.add(subscription)
def subscription_delete(subscriptionId: str,
uow: unit_of_work.AbstractUnitOfWork):
with uow:
+ first = uow.subscriptions.get(subscriptionId)
+ if not first:
+ raise NotFoundException(
+ "Subscription {} not found.".format(subscriptionId))
uow.subscriptions.delete(subscriptionId)
uow.commit()
return True