logger = o2logging.get_logger(__name__)
-def load_alarm_dictionary_from_conf_file(conf_path: str,
- uow: unit_of_work.AbstractUnitOfWork):
-
- logger.info("Converting alarm.yaml to dict: ")
+def load_alarm_dictionary_from_conf_file(uow: unit_of_work.AbstractUnitOfWork):
+ conf_path = config.get_alarm_yaml_filename()
+ logger.info(f"Converting alarm.yaml to dictionary: {conf_path}")
if not os.path.isfile(conf_path):
logger.error("file %s doesn't exist. Ending execution" %
try:
with open(conf_path, 'r') as stream:
alarm_yaml = yaml.load(stream, Loader=yaml.FullLoader)
- dictionaries = alarm_yaml.get('dictionary')
+ dictionaries = alarm_yaml.get('alarmDictionary')['schema']
+ schema_ver = alarm_yaml.get('alarmDictionary')['schemaVersion']
except Exception as exp:
logger.error(exp)
raise RuntimeError(exp)
for dictionary in list(dictionaries.keys()):
+ # res_type = uow.resource_types.get_by_name(dictionary)
+ # logger.info('res_type: ' + res_type.resourceTypeName)
+ version = dictionaries[dictionary]['version']
+ definitions = dictionaries[dictionary]['alarmDefinition']
+ dict_id = str(uuid_gen.uuid3(
+ uuid_gen.NAMESPACE_URL,
+ str(f"{dictionary}_alarmdictionary")))
+
with uow:
- # res_type = uow.resource_types.get_by_name(dictionary)
- # logger.info('res_type: ' + res_type.resourceTypeName)
- alarm_dict = alarm.AlarmDictionary(dictionary)
- alarm_dict.entityType = dictionary
- alarm_dict.alarmDictionaryVersion = \
- dictionaries[dictionary]['version']
- alarm_dict.alarmDefinition = \
- dictionaries[dictionary]['alarmDefinition']
+ alarm_dict = uow.alarm_dictionaries.get(dict_id)
+ if alarm_dict:
+ alarm_dict.alarmDictionaryVersion = version
+ alarm_dict.alarmDictionarySchemaVersion = schema_ver
+ else:
+ alarm_dict = alarm.AlarmDictionary(dict_id)
+ alarm_dict.entityType = dictionary
+ alarm_dict.alarmDictionaryVersion = version
+ alarm_dict.alarmDictionarySchemaVersion = schema_ver
+
+ definition_list = list()
+ if definitions:
+ for definition in definitions:
+ def_uuid = str(uuid_gen.uuid3(
+ uuid_gen.NAMESPACE_URL, str(definition)))
+ def_obj = uow.alarm_definitions.get(def_uuid)
+ definition_list.append(def_obj)
+ alarm_dict.alarmDefinition = definition_list
uow.alarm_dictionaries.add(alarm_dict)
+ uow.commit()
+ # conf.alarm_dictionaries.add(alarm_dict)
def prettyDict(dict):
def load_alarm_definition(uow: unit_of_work.AbstractUnitOfWork):
- logger.info("Converting events.yaml to dict: ")
EVENT_TYPES_FILE = config.get_events_yaml_filename()
+ logger.info(f"Converting events.yaml to dict: {EVENT_TYPES_FILE}")
if not os.path.isfile(EVENT_TYPES_FILE):
logger.error("file %s doesn't exist. Ending execution" %
# Parse events.yaml dict, and add any new alarm to definition table:
logger.info(
- "Parsing events.yaml and adding any new alarm to definition table: ")
+ "Parsing events.yaml and adding any new alarm to definition table.")
for event_type in event_types:
if event_types.get(event_type).get('Type') == "Alarm":
alarm_def = alarm.AlarmDefinition(
id=event_uuid,
name=str(event_type),
- last_change=alarm.AlarmLastChangeEnum.ADDED,
+ change_type=alarm.AlarmChangeTypeEnum.ADDED,
desc=event_description, prop_action=prop_action,
clearing_type=alarm.ClearingTypeEnum.MANUAL,
pk_noti_field=""
)
- logger.info(str(event_type))
+ # logger.debug(str(event_type))
uow.alarm_definitions.add(alarm_def)
uow.commit()