exc,
)
- from sqlalchemy.orm import mapper, relationship
+ from sqlalchemy.orm import mapper, relationship, backref
# from sqlalchemy.sql.sqltypes import Integer
from o2ims.domain import ocloud as ocloudModel
Column("resourceClass", Enum(ResourceTypeEnum)),
# Column("extensions", String(1024))
- Column("oCloudId", ForeignKey("ocloud.oCloudId")),
+ Column("alarmDictionaryId", ForeignKey("alarmDictionary.id"))
)
resourcepool = Table(
Column("name", String(255)),
Column("globalAssetId", String(255)),
Column("parentId", String(255)),
- Column("description", String(255)),
- Column("elements", Text())
- # Column("extensions", String(1024))
+ Column("description", String()),
+ Column("elements", Text()),
+ Column("extensions", String())
)
deploymentmanager = Table(
Column("alarmDefinitionId", String(255), primary_key=True),
Column("alarmName", String(255), unique=True),
Column("alarmLastChange", String(255)),
+ Column("alarmChangeType", String(255)),
Column("alarmDescription", String(255)),
- Column("proposeRepairActions", String(255)),
+ Column("proposedRepairActions", String(1024)),
Column("clearingType", String(255)),
Column("managementInterfaceId", String(255)),
Column("pkNotificationField", String(255))
)
+ alarm_dictionary = Table(
+ "alarmDictionary",
+ metadata,
+ Column("updatetime", DateTime),
+ Column("createtime", DateTime),
+
+ Column("id", String(255), primary_key=True),
+ Column("entityType", String(255), unique=True),
+ Column("alarmDictionaryVersion", String(255)),
+ Column("alarmDictionarySchemaVersion", String(255)),
+ Column("vendor", String(255)),
+ Column("managementInterfaceId", String(255)),
+ Column("pkNotificationField", String(255))
+
+ # Column("resourceTypeId", ForeignKey("resourceType.resourceTypeId"))
+ )
+
+ association_table1 = Table(
+ 'associationAlarmDictAndAlarmDef',
+ metadata,
+ Column("alarmDictionaryId", ForeignKey(
+ 'alarmDictionary.id', ondelete='cascade')),
+ Column("alarmDefinitionId", ForeignKey(
+ 'alarmDefinition.alarmDefinitionId'))
+ )
+
alarm_event_record = Table(
"alarmEventRecord",
metadata,
def start_o2ims_mappers(engine=None):
logger.info("Starting O2 IMS mappers")
+ # IMS Infrastruture Monitoring Mappering
+ mapper(alarmModel.AlarmEventRecord, alarm_event_record)
+ alarmdefinition_mapper = mapper(
+ alarmModel.AlarmDefinition, alarm_definition)
+ mapper(alarmModel.ProbableCause, alarm_probable_cause)
+ mapper(alarmModel.AlarmSubscription, alarm_subscription)
+ alarm_dictionary_mapper = mapper(
+ alarmModel.AlarmDictionary, alarm_dictionary,
+ properties={
+ "alarmDefinition": relationship(alarmdefinition_mapper,
+ cascade='all,delete-orphan',
+ secondary=association_table1,
+ single_parent=True,
+ backref='alarmDictionaries')
+ }
+ )
+
# IMS Infrastructure Inventory Mappering
dm_mapper = mapper(ocloudModel.DeploymentManager, deploymentmanager)
resourcepool_mapper = mapper(ocloudModel.ResourcePool, resourcepool)
- resourcetype_mapper = mapper(ocloudModel.ResourceType, resourcetype)
+ resourcetype_mapper = mapper(
+ ocloudModel.ResourceType, resourcetype,
+ properties={
+ # "alarmDictionary": relationship(alarmModel.AlarmDictionary,
+ # uselist=False)
+ "alarmDictionary": relationship(alarm_dictionary_mapper,
+ backref=backref(
+ 'resourceType', uselist=False))
+
+ }
+ )
mapper(
ocloudModel.Ocloud,
ocloud,
properties={
"deploymentManagers": relationship(dm_mapper),
- "resourceTypes": relationship(resourcetype_mapper),
+ # "resourceTypes": relationship(resourcetype_mapper),
"resourcePools": relationship(resourcepool_mapper)
})
mapper(
)
mapper(subModel.Subscription, subscription)
- # IMS Infrastruture Monitoring Mappering
- mapper(alarmModel.AlarmEventRecord, alarm_event_record)
- mapper(alarmModel.AlarmDefinition, alarm_definition)
- mapper(alarmModel.ProbableCause, alarm_probable_cause)
- mapper(alarmModel.AlarmSubscription, alarm_subscription)
-
if engine is not None:
wait_for_metadata_ready(engine)
from __future__ import annotations
import json
+ from o2common.config import config
from o2common.domain.base import AgRoot, Serializer
- from o2common.config import config, conf as CONF
# from dataclasses import dataclass
# from datetime import date
# from typing import Optional, List, Set
from .resource_type import ResourceKindEnum, ResourceTypeEnum
+ from .alarm_obj import AlarmDictionary
DeploymentManagerProfileDefault = 'native_k8sapi'
class ResourceType(AgRoot, Serializer):
def __init__(self, typeid: str, name: str, typeEnum: ResourceTypeEnum,
- ocloudid: str, vendor: str = '', model: str = '',
+ vendor: str = '', model: str = '',
version: str = '',
description: str = '') -> None:
super().__init__()
self.vendor = vendor
self.model = model
self.version = version
- self.alarmDictionary = {}
+ self.alarmDictionary = None
self.resourceKind = ResourceKindEnum.UNDEFINED
self.resourceClass = ResourceTypeEnum.UNDEFINED
self.extensions = []
def serialize(self):
d = Serializer.serialize(self)
-
- if CONF.alarm_dictionaries.get(d['name']) is not None:
- d["alarmDictionary"] = CONF.alarm_dictionaries.get(
- d['name']).serialize()
-
+ if 'alarmDictionary' in d and \
+ type(d['alarmDictionary']) is AlarmDictionary:
+ d['alarmDictionary'] = d['alarmDictionary'].serialize()
return d
def __init__(self, resourceId: str, resourceTypeId: str,
resourcePoolId: str, name: str, parentId: str = '',
gAssetId: str = '', elements: str = '',
- description: str = '') -> None:
+ description: str = '', extensions: str = '') -> None:
super().__init__()
self.resourceId = resourceId
self.description = description
self.globalAssetId = gAssetId
self.resourcePoolId = resourcePoolId
self.elements = elements
- self.extensions = []
+ self.extensions = extensions
self.name = name
self.parentId = parentId
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_acc'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Accelerator resource type of Physical Server'))
+ description='An Accelerator resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "%s : An Accelerator resource of the physical server"\
- % stxobj.name
+ # description = "%s : An Accelerator resource of the physical server"\
+ # % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "name", "pdevice", "pciaddr", "pvendor_id", "pvendor",
+ "pclass_id", "pclass", "psvendor", "psdevice",
+ "sriov_totalvfs", "sriov_numvfs", "numa_node"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_cpu'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='A CPU resource type of the Physical Server'))
+ description='A CPU resource type of the Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "%s : A CPU resource of the physical server" % stxobj.name
+ # description = "%s : A CPU resource of the physical server" % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "cpu", "core", "thread", "allocated_function", "numa_node",
+ "cpu_model", "cpu_family"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_ethernet'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Ethernet resource type of Physical Server'))
+ description='An Ethernet resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "%s : An ethernet resource of the physical server"\
- % stxobj.name
+ # description = "%s : An ethernet resource of the physical server"\
+ # % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "name", "namedisplay", "dev_id", "pdevice", "capabilities",
+ "type", "driver", "mac", "numa_node",
+ "pciaddr", "pclass", "psvendor", "psdevice",
+ "sriov_totalvfs", "sriov_numvfs", "dpdksupport",
+ "sriov_vf_driver", "sriov_vf_pdevice_id", "interface_uuid"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from typing import Callable
from o2ims.domain import commands, events
):
stxobj = cmd.data
with uow:
- resourcepool = uow.resource_pools.get(cmd.parentid)
+ # resourcepool = uow.resource_pools.get(cmd.parentid)
# res = uow.session.execute(select(resourcetype).where(
# resourcetype.c.resourceTypeEnum == stxobj.type))
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='The Physical Server resource type'))
+ description='The Physical Server resource type')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
resourcepool_id = parentid
parent_id = None # the root of the resource has no parent id
gAssetId = '' # TODO: global ID
- description = "%s : A physical server resource" % stxobj.name
+ # description = "%s : A physical server resource" % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "hostname", "personality", "id", "mgmt_ip", "mgmt_mac",
+ "software_load", "capabilities",
+ "operational", "availability", "administrative",
+ "boot_device", "rootfs_device", "install_state", "subfunctions",
+ "clock_synchronization", "max_cpu_mhz_allowed"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_if'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Interface resource type of Physical Server'))
+ description='An Interface resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "%s : An interface resource of the physical server"\
- % stxobj.name
+ # description = "%s : An interface resource of the physical server"\
+ # % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "ifname", "iftype", "imac", "vlan_id", "imtu",
+ "ifclass", "uses", "max_tx_rate",
+ "sriov_vf_driver", "sriov_numvfs", "ptp_role"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_mem'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='A Memory resource type of Physical Server'))
+ description='A Memory resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "%s : A memory resource of the physical server"\
- % stxobj.name
+ # description = "%s : A memory resource of the physical server"\
+ # % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "memtotal_mib", "memavail_mib", "vm_hugepages_use_1G",
+ "vm_hugepages_possible_1G", "hugepages_configured",
+ "vm_hugepages_avail_1G", "vm_hugepages_nr_1G",
+ "vm_hugepages_nr_4K", "vm_hugepages_nr_2M",
+ "vm_hugepages_possible_2M", "vm_hugepages_avail_2M",
+ "platform_reserved_mib", "numa_node"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
from flask_restx import fields
from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
+from o2common.views.flask_restx_fields import Json2Dict
class OcloudDTO:
class ResourceTypeDTO:
+ alarm_definition = api_ims_inventory_v1.model(
+ "AlarmDefinitionDto",
+ {
+ 'alarmDefinitionId': fields.String,
+ 'alarmName': fields.String,
+ 'alarmLastChange': fields.String,
+ 'alarmChangeType': fields.String,
+ 'alarmDescription': fields.String,
+ 'proposedRepairActions': fields.String,
+ 'clearingType': fields.String,
+ 'managementInterfaceId': fields.String,
+ 'pkNotificationField': fields.String,
+ 'alarmAdditionalFields': fields.String,
+ }
+
+ )
alarm_dictionary = api_ims_inventory_v1.model(
"AlarmDictionaryDto",
{
'id': fields.String,
'alarmDictionaryVersion': fields.String,
- 'alarmDictionarySchemVersion': fields.String,
+ 'alarmDictionarySchemaVersion': fields.String,
'entityType': fields.String,
'vendor': fields.String,
'managementInterfaceId': fields.String,
'pkNotificationField': fields.String,
- 'alarmDefinition': fields.String,
+ # 'alarmDefinition': fields.String,
+ 'alarmDefinition': fields.List(fields.Nested(alarm_definition),
+ attribute='alarmDefinition'),
}
)
'parentId': fields.String,
'description': fields.String,
# 'elements': fields.String,
- 'extensions': fields.String
+ # 'extensions': fields.String
+ 'extensions': Json2Dict(attribute='extensions')
+ # 'extensions': fields.Raw(attribute='extensions')
},
mask='{resourceId,resourcePoolId,resourceTypeId,description,parentId}'
)
'parentId': fields.String,
'description': fields.String,
# 'elements': fields.String,
- 'extensions': fields.String
+ # 'extensions': fields.String
+ 'extensions': Json2Dict(attribute='extensions')
+ # 'extensions': fields.Raw(attribute='extensions')
}
if iteration_number:
resource_json_mapping['elements'] = fields.List(
@api_ims_inventory_v1.marshal_with(model)
def get(self, resourceTypeID):
result = ocloud_view.resource_type_one(resourceTypeID, bus.uow)
- if result is not None:
- return result
- raise NotFoundException("Resource type {} doesn't exist".format(
- resourceTypeID))
+ if not result:
+ raise NotFoundException("Resource type {} doesn't exist".format(
+ resourceTypeID))
+ return result
# ---------- ResourcePools ---------- #
# ---------- Resources ---------- #
@api_ims_inventory_v1.route("/v1/resourcePools/<resourcePoolID>/resources")
@api_ims_inventory_v1.param('resourcePoolID', 'ID of the resource pool')
+@api_ims_inventory_v1.response(404, 'Resource pool not found')
# @api_ims_inventory_v1.param('sort', 'sort by column name',
# _in='query')
# @api_ims_inventory_v1.param('per_page', 'The number of results per page ' +
if args.nextpage_opaque_marker is not None:
kwargs['page'] = args.nextpage_opaque_marker
kwargs['filter'] = args.filter if args.filter is not None else ''
-
ret = ocloud_view.resources(resourcePoolID, bus.uow, **kwargs)
+ if ret is None:
+ raise NotFoundException("Resources under {} doesn't exist".format(
+ resourcePoolID))
return link_header(request.full_path, ret)
@api_ims_inventory_v1.doc('Get resource')
@api_ims_inventory_v1.marshal_with(model)
def get(self, resourcePoolID, resourceID):
- result = ocloud_view.resource_one(resourceID, bus.uow)
- if result is not None:
- return result
- raise NotFoundException("Resource {} doesn't exist".format(
- resourceID))
+ result = ocloud_view.resource_one(resourceID, bus.uow, resourcePoolID)
+ if result is None:
+ raise NotFoundException("Resource {} doesn't exist".format(
+ resourceID))
+ return result
# ---------- DeploymentManagers ---------- #
uow: unit_of_work.AbstractUnitOfWork):
with uow:
first = uow.resource_pools.get(resourcePoolId)
- return first.serialize() if first is not None else None
+ return first.serialize() if first else None
def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork,
**kwargs):
+ with uow:
+ first = uow.resource_pools.get(resourcePoolId)
+ if first is None:
+ raise NotFoundException("ResourcePool {} doesn't exist".format(
+ resourcePoolId))
pagination = Pagination(**kwargs)
# filter key should be the same with database name
query_kwargs = pagination.get_pagination()
return pagination.get_result(ret)
-def resource_one(resourceId: str, uow: unit_of_work.AbstractUnitOfWork):
+def resource_one(resourceId: str,
+ uow: unit_of_work.AbstractUnitOfWork, resourcePoolId: str):
with uow:
- first = uow.resources.get(resourceId)
- return first.serialize() if first is not None else None
+ resoucePool = uow.resource_pools.get(resourcePoolId)
+ if resoucePool is None:
+ raise NotFoundException("ResourcePool {} doesn't exist".format(
+ resourcePoolId))
+
+ first = uow.resources.get(resourceId)
+ if first is None:
+ raise NotFoundException("Resource {} doesn't exist".format(
+ resourceId))
+ return first.serialize()
def deployment_managers(uow: unit_of_work.AbstractUnitOfWork, **kwargs):