1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 import uuid as uuid_gen
22 from o2common.service import unit_of_work
23 from o2common.config import config
24 from o2ims.domain import alarm_obj as alarm
26 from o2common.helper import o2logging
27 logger = o2logging.get_logger(__name__)
30 def load_alarm_dictionary_from_conf_file(uow: unit_of_work.AbstractUnitOfWork):
31 conf_path = config.get_alarm_yaml_filename()
32 logger.info(f"Converting alarm.yaml to dictionary: {conf_path}")
34 if not os.path.isfile(conf_path):
35 logger.error("file %s doesn't exist. Ending execution" %
38 errno.ENOENT, os.strerror(errno.ENOENT), conf_path
42 with open(conf_path, 'r') as stream:
43 alarm_yaml = yaml.load(stream, Loader=yaml.FullLoader)
44 dictionaries = alarm_yaml.get('alarmDictionary')['schema']
45 schema_ver = alarm_yaml.get('alarmDictionary')['schemaVersion']
46 except Exception as exp:
48 raise RuntimeError(exp)
50 for dictionary in list(dictionaries.keys()):
51 # res_type = uow.resource_types.get_by_name(dictionary)
52 # logger.info('res_type: ' + res_type.resourceTypeName)
53 version = dictionaries[dictionary]['version']
54 definitions = dictionaries[dictionary]['alarmDefinition']
55 dict_id = str(uuid_gen.uuid3(
56 uuid_gen.NAMESPACE_URL,
57 str(f"{dictionary}_alarmdictionary")))
60 alarm_dict = uow.alarm_dictionaries.get(dict_id)
62 alarm_dict.alarmDictionaryVersion = version
63 alarm_dict.alarmDictionarySchemaVersion = schema_ver
65 alarm_dict = alarm.AlarmDictionary(dict_id)
66 alarm_dict.entityType = dictionary
67 alarm_dict.alarmDictionaryVersion = version
68 alarm_dict.alarmDictionarySchemaVersion = schema_ver
70 definition_list = list()
72 for definition in definitions:
73 def_uuid = str(uuid_gen.uuid3(
74 uuid_gen.NAMESPACE_URL, str(definition)))
75 def_obj = uow.alarm_definitions.get(def_uuid)
76 definition_list.append(def_obj)
77 alarm_dict.alarmDefinition = definition_list
78 uow.alarm_dictionaries.add(alarm_dict)
80 # conf.alarm_dictionaries.add(alarm_dict)
84 output = json.dumps(dict, sort_keys=True, indent=4)
88 def load_alarm_definition(uow: unit_of_work.AbstractUnitOfWork):
89 EVENT_TYPES_FILE = config.get_events_yaml_filename()
90 logger.info(f"Converting events.yaml to dict: {EVENT_TYPES_FILE}")
92 if not os.path.isfile(EVENT_TYPES_FILE):
93 logger.error("file %s doesn't exist. Ending execution" %
96 errno.ENOENT, os.strerror(errno.ENOENT), EVENT_TYPES_FILE
100 with open(EVENT_TYPES_FILE, 'r') as stream:
101 event_types = yaml.load(stream, Loader=yaml.FullLoader)
102 except Exception as exp:
104 raise RuntimeError(exp)
106 for alarm_id in list(event_types.keys()):
107 if isinstance(alarm_id, float):
108 # force 3 digits after the decimal point,
109 # to include trailing zero's (ex.: 200.010)
110 formatted_alarm_id = "{:.3f}".format(alarm_id)
111 event_types[formatted_alarm_id] = event_types.pop(alarm_id)
113 event_types = collections.OrderedDict(sorted(event_types.items()))
116 uneditable_descriptions = {'100.114', '200.007',
117 '200.02', '200.021', '200.022', '800.002'}
119 # Parse events.yaml dict, and add any new alarm to definition table:
121 "Parsing events.yaml and adding any new alarm to definition table.")
122 for event_type in event_types:
124 if event_types.get(event_type).get('Type') == "Alarm":
125 event_uuid = str(uuid_gen.uuid3(
126 uuid_gen.NAMESPACE_URL, str(event_type)))
128 string_event_type = str(event_type)
130 yaml_event_list.append(string_event_type)
132 if str(event_type) not in uneditable_descriptions:
133 event_description = (event_types.get(event_type)
136 event_description = event_types.get(
137 event_type).get('Description')
139 event_description = str(event_description)
140 event_description = (event_description[:250] + ' ...') \
141 if len(event_description) > 250 else event_description
142 prop_action = event_types.get(
143 event_type).get("Proposed_Repair_Action")
146 alarm_def = uow.alarm_definitions.get(event_uuid)
147 event_mgmt_affecting = str(event_types.get(event_type).get(
148 'Management_Affecting_Severity', 'warning'))
150 event_degrade_affecting = str(event_types.get(event_type).get(
151 'Degrade_Affecting_Severity', 'none'))
154 alarm_def.description = event_description
155 alarm_def.mgmt_affecting = event_mgmt_affecting
156 alarm_def.degrade_affecting = event_degrade_affecting
158 alarm_def = alarm.AlarmDefinition(
160 name=str(event_type),
161 change_type=alarm.AlarmChangeTypeEnum.ADDED,
162 desc=event_description, prop_action=prop_action,
163 clearing_type=alarm.ClearingTypeEnum.MANUAL,
166 # logger.debug(str(event_type))
167 uow.alarm_definitions.add(alarm_def)
171 prob_cause = event_types.get(event_type).get("Probable_Cause")
172 prob_cause_uuid = str(uuid_gen.uuid3(
173 uuid_gen.NAMESPACE_URL, prob_cause))
176 probable_cause = uow.alarm_probable_causes.get(prob_cause_uuid)
177 if probable_cause is None:
178 pc = alarm.ProbableCause(
179 prob_cause_uuid, prob_cause, prob_cause)
180 uow.alarm_probable_causes.add(pc)