# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
-from o2ims.domain import commands, events
-from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import MismatchedModel
-from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain import commands, events
+from o2ims.domain.ocloud import ResourceType
from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2ims.service.auditor.pserver_res_handler import is_outdated, \
+ create_by, update_by
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class InvalidResourceType(Exception):
- pass
-
-
def update_pserver_if(
cmd: commands.UpdatePserverIf,
uow: AbstractUnitOfWork
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_if'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Interface resource type of Physical Server'))
+ description='An Interface resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ res_type.events.append(events.ResourceTypeChanged(
+ id=res_type.resourceTypeId,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime))
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
logger.info("Update the interface of pserver: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
-
-
-def is_outdated(resource: Resource, stxobj: StxGenericModel):
- return True if resource.hash != stxobj.hash else False
-
-
-def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
- -> Resource:
- # content = json.loads(stxobj.content)
- resourcetype_id = resourcetype_id
- resourcepool_id = parent.resourcePoolId
- parent_id = parent.resourceId
- gAssetId = '' # TODO: global ID
- description = "%s : An interface resource of the physical server"\
- % stxobj.name
- resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
- resource.createtime = stxobj.createtime
- resource.updatetime = stxobj.updatetime
- resource.hash = stxobj.hash
-
- return resource
-
-
-def update_by(target: Resource, stxobj: StxGenericModel,
- parentid: str) -> None:
- if target.resourceId != stxobj.id:
- raise MismatchedModel("Mismatched Id")
- target.createtime = stxobj.createtime
- target.updatetime = stxobj.updatetime
- target.hash = stxobj.hash
- target.version_number = target.version_number + 1
- target.events.append(events.ResourceChanged(
- id=stxobj.id,
- resourcePoolId=target.resourcePoolId,
- notificationEventType=NotificationEventEnum.MODIFY,
- updatetime=stxobj.updatetime
- ))