# See the License for the specific language governing permissions and
# limitations under the License.
-dictionary:
- pserver:
- version: 0.1
- alarmDefinition: [
- "100.104", "100.105"
- ]
- pserver_cpu:
- version: 0.1
- alarmDefinition: [
- "100.101"
- ]
- pserver_mem:
- version: 0.1
- alarmDefinition: [
- "100.103"
- ]
- pserver_ethernet:
- version: 0.1
- alarmDefinition: [
- "100.102"
- ]
- pserver_if:
- version: 0.1
- alarmDefinition: [
-
- ]
+alarmDictionary:
+ schemaVersion: 0.1
+ schema:
+ pserver:
+ version: 0.1
+ alarmDefinition:
+ - "100.104"
+ - "100.105"
+ pserver_cpu:
+ version: 0.1
+ alarmDefinition:
+ - "100.101"
+ pserver_mem:
+ version: 0.1
+ alarmDefinition:
+ - "100.103"
+ pserver_ethernet:
+ version: 0.1
+ alarmDefinition:
+ - "100.102"
+ pserver_if:
+ version: 0.1
+ alarmDefinition:
+ compute_aggregate:
+ version: 0.1
+ alarmDefinition:
+ network_aggregate:
+ version: 0.1
+ alarmDefinition:
+ storage_aggregate:
+ version: 0.1
+ alarmDefinition:
+ undefined_aggregate:
+ version: 0.1
+ alarmDefinition:
.AlarmEventRecordSqlAlchemyRepository(self.session)
self.alarm_definitions = alarm_repository\
.AlarmDefinitionSqlAlchemyRepository(self.session)
+ self.alarm_dictionaries = alarm_repository\
+ .AlarmDictionarySqlAlchemyRepository(self.session)
self.alarm_subscriptions = alarm_repository\
.AlarmSubscriptionSqlAlchemyRepository(self.session)
self.alarm_probable_causes = alarm_repository\
from o2ims.views import configure_namespace as ims_route_configure_namespace
from o2common.views.route_exception import configure_exception
-from o2ims.adapter.clients.alarm_dict_client import load_alarm_definition
from o2common.authmw import authmiddleware
from o2common.authmw import authprov
from o2common.config.config import get_review_url
configure_exception(api)
ims_route_configure_namespace(api)
-
-load_alarm_definition(bus.uow)
from o2ims.service.watcher.pserver_acc_watcher import PServerAccWatcher
from o2ims.adapter.clients.ocloud_client import StxAccClient
+from o2ims.adapter.clients.alarm_dict_client import load_alarm_definition,\
+ load_alarm_dictionary_from_conf_file
from o2ims.service.watcher.agg_compute_watcher import ComputeAggWatcher
from o2ims.service.watcher.agg_network_watcher import NetworkAggWatcher
from o2ims.service.watcher.agg_storage_watcher import StorageAggWatcher
self.args = args
self.bus = bootstrap.bootstrap()
self.worker = PollWorker(bus=self.bus)
+ load_alarm_definition(self.bus.uow)
+ load_alarm_dictionary_from_conf_file(self.bus.uow)
def run(self):
try:
# See the License for the specific language governing permissions and
# limitations under the License.
-from o2common.config import config, conf
-from o2ims.adapter import alarm_loader
-from o2ims.adapter.clients.alarm_dict_client import \
- load_alarm_dictionary_from_conf_file
+# from o2common.config import conf
+# from o2ims.adapter import alarm_loader
+# from o2ims.adapter.clients.alarm_dict_client import \
+# load_alarm_dictionary_from_conf_file
# config file
-conf.alarm_dictionaries = alarm_loader\
- .AlarmDictionaryConfigFileRepository()
-load_alarm_dictionary_from_conf_file(config.get_alarm_yaml_filename())
+# conf.alarm_dictionaries = alarm_loader\
+# .AlarmDictionaryConfigFileRepository()
+# load_alarm_dictionary_from_conf_file(config.get_alarm_yaml_filename())
from o2ims.domain import alarm_obj
from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
AlarmEventRecordRepository, AlarmSubscriptionRepository, \
- AlarmProbableCauseRepository
+ AlarmProbableCauseRepository, AlarmDictionaryRepository
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
alarmDefinitionId=alarm_definition_id).delete()
+class AlarmDictionarySqlAlchemyRepository(AlarmDictionaryRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, alarm_dict: alarm_obj.AlarmDictionary):
+ self.session.add(alarm_dict)
+
+ def _get(self, dictionary_id) -> alarm_obj.AlarmDictionary:
+ return self.session.query(alarm_obj.AlarmDictionary).filter_by(
+ id=dictionary_id).first()
+
+ def _list(self) -> List[alarm_obj.AlarmDictionary]:
+ return self.session.query(alarm_obj.AlarmDictionary)
+
+ def _delete(self, dictionary_id):
+ self.session.query(alarm_obj.AlarmDictionary).filter_by(
+ id=dictionary_id).delete()
+
+
class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
def __init__(self, session):
super().__init__()
import uuid as uuid_gen
from o2common.service import unit_of_work
-from o2common.config import config, conf
+from o2common.config import config
from o2ims.domain import alarm_obj as alarm
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-def load_alarm_dictionary_from_conf_file(conf_path: str):
-
- logger.info("Converting alarm.yaml to dict: ")
+def load_alarm_dictionary_from_conf_file(uow: unit_of_work.AbstractUnitOfWork):
+ conf_path = config.get_alarm_yaml_filename()
+ logger.info(f"Converting alarm.yaml to dictionary: {conf_path}")
if not os.path.isfile(conf_path):
logger.error("file %s doesn't exist. Ending execution" %
try:
with open(conf_path, 'r') as stream:
alarm_yaml = yaml.load(stream, Loader=yaml.FullLoader)
- dictionaries = alarm_yaml.get('dictionary')
+ dictionaries = alarm_yaml.get('alarmDictionary')['schema']
+ schema_ver = alarm_yaml.get('alarmDictionary')['schemaVersion']
except Exception as exp:
logger.error(exp)
raise RuntimeError(exp)
for dictionary in list(dictionaries.keys()):
# res_type = uow.resource_types.get_by_name(dictionary)
# logger.info('res_type: ' + res_type.resourceTypeName)
- alarm_dict = alarm.AlarmDictionary(dictionary)
- alarm_dict.entityType = dictionary
- alarm_dict.alarmDictionaryVersion = \
- dictionaries[dictionary]['version']
- alarm_dict.alarmDefinition = \
- dictionaries[dictionary]['alarmDefinition']
- conf.alarm_dictionaries.add(alarm_dict)
+ version = dictionaries[dictionary]['version']
+ definitions = dictionaries[dictionary]['alarmDefinition']
+ dict_id = str(uuid_gen.uuid3(
+ uuid_gen.NAMESPACE_URL,
+ str(f"{dictionary}_alarmdictionary")))
+
+ with uow:
+ alarm_dict = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dict:
+ alarm_dict.alarmDictionaryVersion = version
+ alarm_dict.alarmDictionarySchemaVersion = schema_ver
+ else:
+ alarm_dict = alarm.AlarmDictionary(dict_id)
+ alarm_dict.entityType = dictionary
+ alarm_dict.alarmDictionaryVersion = version
+ alarm_dict.alarmDictionarySchemaVersion = schema_ver
+
+ definition_list = list()
+ if definitions:
+ for definition in definitions:
+ def_uuid = str(uuid_gen.uuid3(
+ uuid_gen.NAMESPACE_URL, str(definition)))
+ def_obj = uow.alarm_definitions.get(def_uuid)
+ definition_list.append(def_obj)
+ alarm_dict.alarmDefinition = definition_list
+ uow.alarm_dictionaries.add(alarm_dict)
+ uow.commit()
+ # conf.alarm_dictionaries.add(alarm_dict)
def prettyDict(dict):
def load_alarm_definition(uow: unit_of_work.AbstractUnitOfWork):
- logger.info("Converting events.yaml to dict: ")
EVENT_TYPES_FILE = config.get_events_yaml_filename()
+ logger.info(f"Converting events.yaml to dict: {EVENT_TYPES_FILE}")
if not os.path.isfile(EVENT_TYPES_FILE):
logger.error("file %s doesn't exist. Ending execution" %
# Parse events.yaml dict, and add any new alarm to definition table:
logger.info(
- "Parsing events.yaml and adding any new alarm to definition table: ")
+ "Parsing events.yaml and adding any new alarm to definition table.")
for event_type in event_types:
if event_types.get(event_type).get('Type') == "Alarm":
alarm_def = alarm.AlarmDefinition(
id=event_uuid,
name=str(event_type),
- last_change=alarm.AlarmLastChangeEnum.ADDED,
+ change_type=alarm.AlarmChangeTypeEnum.ADDED,
desc=event_description, prop_action=prop_action,
clearing_type=alarm.ClearingTypeEnum.MANUAL,
pk_noti_field=""
)
- logger.info(str(event_type))
+ # logger.debug(str(event_type))
uow.alarm_definitions.add(alarm_def)
uow.commit()
return config_client
def getSubcloudList(self):
+ self.dcclient = self.getDcmanagerClient()
subs = self.dcclient.subcloud_manager.list_subclouds()
known_subs = [sub for sub in subs if sub.sync_status != 'unknown']
return known_subs
exc,
)
-from sqlalchemy.orm import mapper, relationship
+from sqlalchemy.orm import mapper, relationship, backref
# from sqlalchemy.sql.sqltypes import Integer
from o2ims.domain import ocloud as ocloudModel
Column("resourceClass", Enum(ResourceTypeEnum)),
# Column("extensions", String(1024))
- Column("oCloudId", ForeignKey("ocloud.oCloudId")),
+ Column("alarmDictionaryId", ForeignKey("alarmDictionary.id"))
)
resourcepool = Table(
Column("alarmDefinitionId", String(255), primary_key=True),
Column("alarmName", String(255), unique=True),
Column("alarmLastChange", String(255)),
+ Column("alarmChangeType", String(255)),
Column("alarmDescription", String(255)),
- Column("proposeRepairActions", String(255)),
+ Column("proposedRepairActions", String(1024)),
Column("clearingType", String(255)),
Column("managementInterfaceId", String(255)),
Column("pkNotificationField", String(255))
)
+alarm_dictionary = Table(
+ "alarmDictionary",
+ metadata,
+ Column("updatetime", DateTime),
+ Column("createtime", DateTime),
+
+ Column("id", String(255), primary_key=True),
+ Column("entityType", String(255), unique=True),
+ Column("alarmDictionaryVersion", String(255)),
+ Column("alarmDictionarySchemaVersion", String(255)),
+ Column("vendor", String(255)),
+ Column("managementInterfaceId", String(255)),
+ Column("pkNotificationField", String(255))
+
+ # Column("resourceTypeId", ForeignKey("resourceType.resourceTypeId"))
+)
+
+association_table1 = Table(
+ 'associationAlarmDictAndAlarmDef',
+ metadata,
+ Column("alarmDictionaryId", ForeignKey(
+ 'alarmDictionary.id', ondelete='cascade')),
+ Column("alarmDefinitionId", ForeignKey(
+ 'alarmDefinition.alarmDefinitionId'))
+)
+
alarm_event_record = Table(
"alarmEventRecord",
metadata,
def start_o2ims_mappers(engine=None):
logger.info("Starting O2 IMS mappers")
+ # IMS Infrastruture Monitoring Mappering
+ mapper(alarmModel.AlarmEventRecord, alarm_event_record)
+ alarmdefinition_mapper = mapper(
+ alarmModel.AlarmDefinition, alarm_definition)
+ mapper(alarmModel.ProbableCause, alarm_probable_cause)
+ mapper(alarmModel.AlarmSubscription, alarm_subscription)
+ alarm_dictionary_mapper = mapper(
+ alarmModel.AlarmDictionary, alarm_dictionary,
+ properties={
+ "alarmDefinition": relationship(alarmdefinition_mapper,
+ cascade='all,delete-orphan',
+ secondary=association_table1,
+ single_parent=True,
+ backref='alarmDictionaries')
+ }
+ )
+
# IMS Infrastructure Inventory Mappering
dm_mapper = mapper(ocloudModel.DeploymentManager, deploymentmanager)
resourcepool_mapper = mapper(ocloudModel.ResourcePool, resourcepool)
- resourcetype_mapper = mapper(ocloudModel.ResourceType, resourcetype)
+ resourcetype_mapper = mapper(
+ ocloudModel.ResourceType, resourcetype,
+ properties={
+ # "alarmDictionary": relationship(alarmModel.AlarmDictionary,
+ # uselist=False)
+ "alarmDictionary": relationship(alarm_dictionary_mapper,
+ backref=backref(
+ 'resourceType', uselist=False))
+
+ }
+ )
mapper(
ocloudModel.Ocloud,
ocloud,
properties={
"deploymentManagers": relationship(dm_mapper),
- "resourceTypes": relationship(resourcetype_mapper),
+ # "resourceTypes": relationship(resourcetype_mapper),
"resourcePools": relationship(resourcepool_mapper)
})
mapper(
)
mapper(subModel.Subscription, subscription)
- # IMS Infrastruture Monitoring Mappering
- mapper(alarmModel.AlarmEventRecord, alarm_event_record)
- mapper(alarmModel.AlarmDefinition, alarm_definition)
- mapper(alarmModel.ProbableCause, alarm_probable_cause)
- mapper(alarmModel.AlarmSubscription, alarm_subscription)
-
if engine is not None:
wait_for_metadata_ready(engine)
self.description = desc
-class AlarmLastChangeEnum(str, Enum):
+class AlarmChangeTypeEnum(str, Enum):
ADDED = 'ADDED'
DELETED = 'DELETED'
MODIFYED = 'MODIFYED'
class AlarmDefinition(AgRoot, Serializer):
- def __init__(self, id: str, name: str, last_change: AlarmLastChangeEnum,
+ def __init__(self, id: str, name: str, change_type: AlarmChangeTypeEnum,
desc: str, prop_action: str, clearing_type: ClearingTypeEnum,
pk_noti_field: str) -> None:
super().__init__()
self.alarmDefinitionId = id
self.alarmName = name
- self.alarmLastChange = last_change
+ self.alarmLastChange = '0.1'
+ self.alarmChangeType = change_type
self.alarmDescription = desc
self.proposedRepairActions = prop_action
self.clearingType = clearing_type
self.vendor = ""
self.managementInterfaceId = "O2IMS"
self.pkNotificationField = ""
- self.alarmDefinition = ""
+ self.alarmDefinition = []
+
+ def serialize(self):
+ d = Serializer.serialize(self)
+ if 'alarmDefinition' in d and len(d['alarmDefinition']) > 0:
+ d['alarmDefinition'] = self.serialize_list(d['alarmDefinition'])
+ return d
class AlarmNotificationEventEnum(str, Enum):
def _get(self, definition_id) -> obj.AlarmDefinition:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> List[obj.AlarmDefinition]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, definition: obj.AlarmDefinition):
raise NotImplementedError
raise NotImplementedError
@abc.abstractmethod
- def _update(self, dictionary: obj.AlarmDictionary):
+ def _list(self, **kwargs) -> List[obj.AlarmDictionary]:
raise NotImplementedError
@abc.abstractmethod
def _get(self, probable_cause_id) -> obj.ProbableCause:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> List[obj.ProbableCause]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, probable_cause: obj.ProbableCause):
raise NotImplementedError
from __future__ import annotations
import json
+from o2common.config import config
from o2common.domain.base import AgRoot, Serializer
-from o2common.config import config, conf as CONF
# from dataclasses import dataclass
# from datetime import date
# from typing import Optional, List, Set
from .resource_type import ResourceKindEnum, ResourceTypeEnum
+from .alarm_obj import AlarmDictionary
DeploymentManagerProfileDefault = 'native_k8sapi'
class ResourceType(AgRoot, Serializer):
def __init__(self, typeid: str, name: str, typeEnum: ResourceTypeEnum,
- ocloudid: str, vendor: str = '', model: str = '',
+ vendor: str = '', model: str = '',
version: str = '',
description: str = '') -> None:
super().__init__()
self.vendor = vendor
self.model = model
self.version = version
- self.alarmDictionary = {}
+ self.alarmDictionary = None
self.resourceKind = ResourceKindEnum.UNDEFINED
self.resourceClass = ResourceTypeEnum.UNDEFINED
self.extensions = []
def serialize(self):
d = Serializer.serialize(self)
-
- if CONF.alarm_dictionaries.get(d['name']) is not None:
- d["alarmDictionary"] = CONF.alarm_dictionaries.get(
- d['name']).serialize()
-
+ if 'alarmDictionary' in d and \
+ type(d['alarmDictionary']) is AlarmDictionary:
+ d['alarmDictionary'] = d['alarmDictionary'].serialize()
return d
with uow:
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
)
first = res.first()
if first is None:
- resourcepool = uow.resource_pools.get(cmd.parentid)
+ # resourcepool = uow.resource_pools.get(cmd.parentid)
res_type_name = 'compute_aggregate'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='The compute Aggregate resource type'))
+ description='The compute Aggregate resource type')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
with uow:
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
)
first = res.first()
if first is None:
- resourcepool = uow.resource_pools.get(cmd.parentid)
+ # resourcepool = uow.resource_pools.get(cmd.parentid)
res_type_name = 'network_aggregate'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='The network Aggregate resource type'))
+ description='The network Aggregate resource type')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
with uow:
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
)
first = res.first()
if first is None:
- resourcepool = uow.resource_pools.get(cmd.parentid)
+ # resourcepool = uow.resource_pools.get(cmd.parentid)
res_type_name = 'storage_aggregate'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='The storage Aggregate resource type'))
+ description='The storage Aggregate resource type')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
with uow:
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
)
first = res.first()
if first is None:
- resourcepool = uow.resource_pools.get(cmd.parentid)
+ # resourcepool = uow.resource_pools.get(cmd.parentid)
res_type_name = 'undefined_aggregate'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='The undefined Aggregate resource type'))
+ description='The undefined Aggregate resource type')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_acc'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Accelerator resource type of Physical Server'))
+ description='An Accelerator resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_cpu'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='A CPU resource type of the Physical Server'))
+ description='A CPU resource type of the Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_dev'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='A Device resource type of Physical Server'))
+ description='A Device resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_ethernet'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Ethernet resource type of Physical Server'))
+ description='An Ethernet resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
):
stxobj = cmd.data
with uow:
- resourcepool = uow.resource_pools.get(cmd.parentid)
+ # resourcepool = uow.resource_pools.get(cmd.parentid)
# res = uow.session.execute(select(resourcetype).where(
# resourcetype.c.resourceTypeEnum == stxobj.type))
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='The Physical Server resource type'))
+ description='The Physical Server resource type')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_if'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='An Interface resource type of Physical Server'))
+ description='An Interface resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
res_type_name = 'pserver_mem'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='A Memory resource type of Physical Server'))
+ description='A Memory resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
stxobj = cmd.data
with uow:
p_resource = uow.resources.get(cmd.parentid)
- resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+ # resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
res = uow.session.execute(
'''
- SELECT "resourceTypeId", "oCloudId", "name"
+ SELECT "resourceTypeId", "name"
FROM "resourceType"
WHERE "resourceTypeEnum" = :resource_type_enum
''',
)
first = res.first()
if first is None:
- resourcetype_id = str(uuid.uuid4())
- uow.resource_types.add(ResourceType(
- resourcetype_id,
- 'pserver_if_port', stxobj.type,
- resourcepool.oCloudId))
res_type_name = 'pserver_if_port'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
- uow.resource_types.add(ResourceType(
+ res_type = ResourceType(
resourcetype_id,
res_type_name, stxobj.type,
- resourcepool.oCloudId,
- description='A Port resource type of Physical Server'))
+ description='A Port resource type of Physical Server')
+ dict_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ str(f"{res_type_name}_alarmdictionary")))
+ alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dictionary:
+ res_type.alarmDictionary = alarm_dictionary
+ uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
class ResourceTypeDTO:
+ alarm_definition = api_ims_inventory_v1.model(
+ "AlarmDefinitionDto",
+ {
+ 'alarmDefinitionId': fields.String,
+ 'alarmName': fields.String,
+ 'alarmLastChange': fields.String,
+ 'alarmChangeType': fields.String,
+ 'alarmDescription': fields.String,
+ 'proposedRepairActions': fields.String,
+ 'clearingType': fields.String,
+ 'managementInterfaceId': fields.String,
+ 'pkNotificationField': fields.String,
+ 'alarmAdditionalFields': fields.String,
+ }
+
+ )
alarm_dictionary = api_ims_inventory_v1.model(
"AlarmDictionaryDto",
{
'id': fields.String,
'alarmDictionaryVersion': fields.String,
- 'alarmDictionarySchemVersion': fields.String,
+ 'alarmDictionarySchemaVersion': fields.String,
'entityType': fields.String,
'vendor': fields.String,
'managementInterfaceId': fields.String,
'pkNotificationField': fields.String,
- 'alarmDefinition': fields.String,
+ # 'alarmDefinition': fields.String,
+ 'alarmDefinition': fields.List(fields.Nested(alarm_definition),
+ attribute='alarmDefinition'),
}
)
@api_ims_inventory_v1.marshal_with(model)
def get(self, resourceTypeID):
result = ocloud_view.resource_type_one(resourceTypeID, bus.uow)
- if result is not None:
- return result
- raise NotFoundException("Resource type {} doesn't exist".format(
- resourceTypeID))
+ if not result:
+ raise NotFoundException("Resource type {} doesn't exist".format(
+ resourceTypeID))
+ return result
# ---------- ResourcePools ---------- #
uow: unit_of_work.AbstractUnitOfWork):
with uow:
first = uow.resource_pools.get(resourcePoolId)
- return first.serialize() if first is not None else None
+ return first.serialize() if first else None
def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork,