def _get(self, id) -> ocloudModel.StxGenericModel:
return self.driver.getInstanceInfo()
- def _list(self, **filters):
+ def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:
return [self.driver.getInstanceInfo()]
def _set_stx_client(self):
def _get(self, id) -> ocloudModel.StxGenericModel:
return self.driver.getResourcePoolDetail(id)
- def _list(self, **filters):
+ def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:
return self.driver.getResourcePoolList(**filters)
def _set_stx_client(self):
def _get(self, name) -> ocloudModel.StxGenericModel:
return self.driver.getK8sDetail(name)
- def _list(self, **filters):
+ def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:
return self.driver.getK8sList(**filters)
def _set_stx_client(self):
@ staticmethod
def _hostconverter(host):
+ selected_keys = [
+ "hostname", "personality", "id", "mgmt_ip", "mgmt_mac",
+ "software_load", "capabilities",
+ "operational", "availability", "administrative",
+ "boot_device", "rootfs_device", "install_state", "subfunctions",
+ "clock_synchronization", "max_cpu_mhz_allowed"
+ ]
+ content = host.to_dict()
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ setattr(host, 'filtered', filtered)
setattr(host, 'name', host.hostname)
return host
@ staticmethod
def _cpuconverter(cpu):
+ selected_keys = [
+ "cpu", "core", "thread", "allocated_function", "numa_node",
+ "cpu_model", "cpu_family"
+ ]
+ content = cpu.to_dict()
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ setattr(cpu, 'filtered', filtered)
setattr(cpu, 'name', cpu.ihost_uuid.split(
'-', 1)[0] + '-cpu-'+str(cpu.cpu))
return cpu
@ staticmethod
def _memconverter(mem):
+ selected_keys = [
+ "memtotal_mib", "memavail_mib", "vm_hugepages_use_1G",
+ "vm_hugepages_possible_1G", "hugepages_configured",
+ "vm_hugepages_avail_1G", "vm_hugepages_nr_1G",
+ "vm_hugepages_nr_4K", "vm_hugepages_nr_2M",
+ "vm_hugepages_possible_2M", "vm_hugepages_avail_2M",
+ "platform_reserved_mib", "numa_node"
+ ]
+ content = mem.to_dict()
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ setattr(mem, 'filtered', filtered)
setattr(mem, 'name', mem.ihost_uuid.split('-', 1)[0] +
'-mem-node-'+str(mem.numa_node))
return mem
@ staticmethod
def _ethconverter(eth):
+ selected_keys = [
+ "name", "namedisplay", "dev_id", "pdevice", "capabilities",
+ "type", "driver", "mac", "numa_node",
+ "pciaddr", "pclass", "psvendor", "psdevice",
+ "sriov_totalvfs", "sriov_numvfs", "dpdksupport",
+ "sriov_vf_driver", "sriov_vf_pdevice_id", "interface_uuid"
+ ]
+ content = eth.to_dict()
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ setattr(eth, 'filtered', filtered)
setattr(eth, 'name', eth.host_uuid.split('-', 1)[0] + '-'+eth.name)
setattr(eth, 'updated_at', None)
setattr(eth, 'created_at', None)
@ staticmethod
def _ifconverter(ifs):
+ selected_keys = [
+ "ifname", "iftype", "imac", "vlan_id", "imtu",
+ "ifclass", "uses", "max_tx_rate",
+ "sriov_vf_driver", "sriov_numvfs", "ptp_role"
+ ]
+ content = ifs.to_dict()
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ setattr(ifs, 'filtered', filtered)
setattr(ifs, 'name', ifs.ihost_uuid.split('-', 1)[0] + '-'+ifs.ifname)
setattr(ifs, 'updated_at', None)
setattr(ifs, 'created_at', None)
@ staticmethod
def _devconverter(dev):
+ selected_keys = [
+ "name", "pdevice", "pciaddr", "pvendor_id", "pvendor",
+ "pclass_id", "pclass", "psvendor", "psdevice",
+ "sriov_totalvfs", "sriov_numvfs", "numa_node"
+ ]
+ content = dev.to_dict()
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ setattr(dev, 'filtered', filtered)
setattr(dev, 'name', dev.host_uuid.split('-', 1)[0] + '-'+dev.name)
return dev
def update_by(target: AlarmEventRecord, fmobj: FaultGenericModel
) -> None:
# content = json.loads(fmobj.content)
- target.hash = fmobj.hash
if fmobj.status == 'clear':
target.perceivedSeverity = alarm_obj.PerceivedSeverityEnum.CLEARED
+
+ target.hash = fmobj.hash
target.events.append(events.AlarmEventChanged(
id=fmobj.id,
notificationEventType=AlarmNotificationEventEnum.CLEAR,
ocloud.createtime = stxobj.createtime
ocloud.updatetime = stxobj.updatetime
# ocloud.content = stxobj.content
+
ocloud.hash = stxobj.hash
ocloud.version_number = ocloud.version_number + 1
ocloud.events.append(events.OcloudChanged(
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-import json
+from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
-from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_acc(
cmd: commands.UpdatePserverAcc,
uow: AbstractUnitOfWork
logger.info("Update the accelerator of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- # description = "%s : An Accelerator resource of the physical server"\
- # % stxobj.name
- content = json.loads(stxobj.content)
- selected_keys = [
- "name", "pdevice", "pciaddr", "pvendor_id", "pvendor",
- "pclass_id", "pclass", "psvendor", "psdevice",
- "sriov_totalvfs", "sriov_numvfs", "numa_node"
- ]
- filtered = dict(
- filter(lambda item: item[0] in selected_keys, content.items()))
- extensions = json.dumps(filtered)
- description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description,
- extensions)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-import json
-from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain import commands, events
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_cpu(
cmd: commands.UpdatePserverCpu,
uow: AbstractUnitOfWork
logger.info("Update the cpu of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- # description = "%s : A CPU resource of the physical server" % stxobj.name
- content = json.loads(stxobj.content)
- selected_keys = [
- "cpu", "core", "thread", "allocated_function", "numa_node",
- "cpu_model", "cpu_family"
- ]
- filtered = dict(
- filter(lambda item: item[0] in selected_keys, content.items()))
- extensions = json.dumps(filtered)
- description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description,
- extensions)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
-from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_dev(
cmd: commands.UpdatePserverDev,
uow: AbstractUnitOfWork
logger.info("Update the device of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- description = "%s : A device resource of the physical server"\
- % stxobj.name
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-import json
-from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain import commands, events
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_eth(
cmd: commands.UpdatePserverEth,
uow: AbstractUnitOfWork
logger.info("Update the ethernet of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- # description = "%s : An ethernet resource of the physical server"\
- # % stxobj.name
- content = json.loads(stxobj.content)
- selected_keys = [
- "name", "namedisplay", "dev_id", "pdevice", "capabilities",
- "type", "driver", "mac", "numa_node",
- "pciaddr", "pclass", "psvendor", "psdevice",
- "sriov_totalvfs", "sriov_numvfs", "dpdksupport",
- "sriov_vf_driver", "sriov_vf_pdevice_id", "interface_uuid"
- ]
- filtered = dict(
- filter(lambda item: item[0] in selected_keys, content.items()))
- extensions = json.dumps(filtered)
- description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description,
- extensions)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
-> Resource:
- # content = json.loads(stxobj.content)
resourcetype_id = resourcetype_id
resourcepool_id = parentid
parent_id = None # the root of the resource has no parent id
gAssetId = '' # TODO: global ID
- # description = "%s : A physical server resource" % stxobj.name
- content = json.loads(stxobj.content)
- selected_keys = [
- "hostname", "personality", "id", "mgmt_ip", "mgmt_mac",
- "software_load", "capabilities",
- "operational", "availability", "administrative",
- "boot_device", "rootfs_device", "install_state", "subfunctions",
- "clock_synchronization", "max_cpu_mhz_allowed"
- ]
- filtered = dict(
- filter(lambda item: item[0] in selected_keys, content.items()))
- extensions = json.dumps(filtered)
- description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
+ extensions = json.dumps(stxobj.filtered)
+ description = ";".join([f"{k}:{v}" for k, v in stxobj.filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
parent_id, gAssetId, stxobj.content, description,
extensions)
parentid: str) -> None:
if target.resourceId != stxobj.id:
raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
+ target.elements = stxobj.content
+ target.extensions = json.dumps(stxobj.filtered)
+ target.description = ";".join(
+ [f"{k}:{v}" for k, v in stxobj.filtered.items()])
target.version_number = target.version_number + 1
target.events.append(events.ResourceChanged(
id=stxobj.id,
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-import json
-from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain import commands, events
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_if(
cmd: commands.UpdatePserverIf,
uow: AbstractUnitOfWork
logger.info("Update the interface of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- # description = "%s : An interface resource of the physical server"\
- # % stxobj.name
- content = json.loads(stxobj.content)
- selected_keys = [
- "ifname", "iftype", "imac", "vlan_id", "imtu",
- "ifclass", "uses", "max_tx_rate",
- "sriov_vf_driver", "sriov_numvfs", "ptp_role"
- ]
- filtered = dict(
- filter(lambda item: item[0] in selected_keys, content.items()))
- extensions = json.dumps(filtered)
- description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description,
- extensions)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-import json
+from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
-from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_mem(
cmd: commands.UpdatePserverMem,
uow: AbstractUnitOfWork
logger.info("Update the memory of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- # description = "%s : A memory resource of the physical server"\
- # % stxobj.name
- content = json.loads(stxobj.content)
- selected_keys = [
- "memtotal_mib", "memavail_mib", "vm_hugepages_use_1G",
- "vm_hugepages_possible_1G", "hugepages_configured",
- "vm_hugepages_avail_1G", "vm_hugepages_nr_1G",
- "vm_hugepages_nr_4K", "vm_hugepages_nr_2M",
- "vm_hugepages_possible_2M", "vm_hugepages_avail_2M",
- "platform_reserved_mib", "numa_node"
- ]
- filtered = dict(
- filter(lambda item: item[0] in selected_keys, content.items()))
- extensions = json.dumps(filtered)
- description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description,
- extensions)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
-from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain import commands, events
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_port(
cmd: commands.UpdatePserverIfPort,
uow: AbstractUnitOfWork
logger.info("Update the port of pserver interface: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- description = "%s : A port resource of the interface"\
- % stxobj.name
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+
+from o2ims.domain import events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+ pass
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+ return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
+ -> Resource:
+ resourcetype_id = resourcetype_id
+ resourcepool_id = parent.resourcePoolId
+ parent_id = parent.resourceId
+ gAssetId = '' # TODO: global ID
+ extensions = json.dumps(stxobj.filtered)
+ description = ";".join([f"{k}:{v}" for k, v in stxobj.filtered.items()])
+ resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+ parent_id, gAssetId, stxobj.content, description,
+ extensions)
+ resource.createtime = stxobj.createtime
+ resource.updatetime = stxobj.updatetime
+ resource.hash = stxobj.hash
+
+ resource.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=resource.resourcePoolId,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime
+ ))
+
+ return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel, parent: Resource)\
+ -> None:
+ if target.resourceId != stxobj.id:
+ raise MismatchedModel("Mismatched Id")
+ target.updatetime = stxobj.updatetime
+ target.hash = stxobj.hash
+ target.elements = stxobj.content
+ target.extensions = json.dumps(stxobj.filtered)
+ target.description = ";".join(
+ [f"{k}:{v}" for k, v in stxobj.filtered.items()])
+ target.version_number = target.version_number + 1
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
target.name = stxobj.name
target.location = content['location'] if content['location'] is not None \
else ''
- target.createtime = stxobj.createtime
target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
target.oCloudId = parentid
+
+ target.hash = stxobj.hash
target.version_number = target.version_number + 1
target.events.append(events.ResourcePoolChanged(
id=stxobj.id,
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0:
+ continue
args.append(ocloud.ResourceType.resourceTypeId == data.id)
ret = uow.resource_types.list_with_count(*args)
if ret[0] > 0:
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0:
+ continue
args.append(ocloud.ResourcePool.resourcePoolId == data.id)
ret = uow.resource_pools.list_with_count(*args)
if ret[0] > 0:
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0:
+ continue
args.append(
ocloud.DeploymentManager.deploymentManagerId == data.id)
ret = uow.deployment_managers.list_with_count(*args)
callback_smo(sub, data, dms_dict)
-class FilterNotEffect(Exception):
- pass
-
-
-class FilterEffect(Exception):
- pass
-
-
def _notify_resource(uow, data):
with uow:
resource = uow.resources.get(data.id)
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0:
+ continue
args.append(ocloud.Resource.resourceId == data.id)
ret = uow.resources.list_with_count(res_pool_id, *args)
if ret[0] > 0: