X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fadapter%2Fclients%2Falarm_dict_client.py;h=a249a4c31cc9e5cb69214c095f8e0f02cb9958f8;hb=5229119cf0fee426b2d14af3887b6fe4f78318a6;hp=e15531ab13f27839c7d24421f98434bf5f5b0c82;hpb=d2f6cc674bf3623caf114a8d7709e70d55ec9340;p=pti%2Fo2.git diff --git a/o2ims/adapter/clients/alarm_dict_client.py b/o2ims/adapter/clients/alarm_dict_client.py index e15531a..a249a4c 100644 --- a/o2ims/adapter/clients/alarm_dict_client.py +++ b/o2ims/adapter/clients/alarm_dict_client.py @@ -27,10 +27,9 @@ from o2common.helper import o2logging logger = o2logging.get_logger(__name__) -def load_alarm_dictionary_from_conf_file(conf_path: str, - uow: unit_of_work.AbstractUnitOfWork): - - logger.info("Converting alarm.yaml to dict: ") +def load_alarm_dictionary_from_conf_file(uow: unit_of_work.AbstractUnitOfWork): + conf_path = config.get_alarm_yaml_filename() + logger.info(f"Converting alarm.yaml to dictionary: {conf_path}") if not os.path.isfile(conf_path): logger.error("file %s doesn't exist. Ending execution" % @@ -42,22 +41,43 @@ def load_alarm_dictionary_from_conf_file(conf_path: str, try: with open(conf_path, 'r') as stream: alarm_yaml = yaml.load(stream, Loader=yaml.FullLoader) - dictionaries = alarm_yaml.get('dictionary') + dictionaries = alarm_yaml.get('alarmDictionary')['schema'] + schema_ver = alarm_yaml.get('alarmDictionary')['schemaVersion'] except Exception as exp: logger.error(exp) raise RuntimeError(exp) for dictionary in list(dictionaries.keys()): + # res_type = uow.resource_types.get_by_name(dictionary) + # logger.info('res_type: ' + res_type.resourceTypeName) + version = dictionaries[dictionary]['version'] + definitions = dictionaries[dictionary]['alarmDefinition'] + dict_id = str(uuid_gen.uuid3( + uuid_gen.NAMESPACE_URL, + str(f"{dictionary}_alarmdictionary"))) + with uow: - # res_type = uow.resource_types.get_by_name(dictionary) - # logger.info('res_type: ' + res_type.resourceTypeName) - alarm_dict = alarm.AlarmDictionary(dictionary) - alarm_dict.entityType = dictionary - alarm_dict.alarmDictionaryVersion = \ - dictionaries[dictionary]['version'] - alarm_dict.alarmDefinition = \ - dictionaries[dictionary]['alarmDefinition'] + alarm_dict = uow.alarm_dictionaries.get(dict_id) + if alarm_dict: + alarm_dict.alarmDictionaryVersion = version + alarm_dict.alarmDictionarySchemaVersion = schema_ver + else: + alarm_dict = alarm.AlarmDictionary(dict_id) + alarm_dict.entityType = dictionary + alarm_dict.alarmDictionaryVersion = version + alarm_dict.alarmDictionarySchemaVersion = schema_ver + + definition_list = list() + if definitions: + for definition in definitions: + def_uuid = str(uuid_gen.uuid3( + uuid_gen.NAMESPACE_URL, str(definition))) + def_obj = uow.alarm_definitions.get(def_uuid) + definition_list.append(def_obj) + alarm_dict.alarmDefinition = definition_list uow.alarm_dictionaries.add(alarm_dict) + uow.commit() + # conf.alarm_dictionaries.add(alarm_dict) def prettyDict(dict): @@ -66,8 +86,8 @@ def prettyDict(dict): def load_alarm_definition(uow: unit_of_work.AbstractUnitOfWork): - logger.info("Converting events.yaml to dict: ") EVENT_TYPES_FILE = config.get_events_yaml_filename() + logger.info(f"Converting events.yaml to dict: {EVENT_TYPES_FILE}") if not os.path.isfile(EVENT_TYPES_FILE): logger.error("file %s doesn't exist. Ending execution" % @@ -98,7 +118,7 @@ def load_alarm_definition(uow: unit_of_work.AbstractUnitOfWork): # Parse events.yaml dict, and add any new alarm to definition table: logger.info( - "Parsing events.yaml and adding any new alarm to definition table: ") + "Parsing events.yaml and adding any new alarm to definition table.") for event_type in event_types: if event_types.get(event_type).get('Type') == "Alarm": @@ -138,12 +158,12 @@ def load_alarm_definition(uow: unit_of_work.AbstractUnitOfWork): alarm_def = alarm.AlarmDefinition( id=event_uuid, name=str(event_type), - last_change=alarm.AlarmLastChangeEnum.ADDED, + change_type=alarm.AlarmChangeTypeEnum.ADDED, desc=event_description, prop_action=prop_action, clearing_type=alarm.ClearingTypeEnum.MANUAL, pk_noti_field="" ) - logger.info(str(event_type)) + # logger.debug(str(event_type)) uow.alarm_definitions.add(alarm_def) uow.commit()