1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # client talking to Stx standalone
17 from typing import List # Optional, Set
20 # from dcmanagerclient.api import client
21 # from cgtsclient.client import get_client as get_stx_client
22 # from cgtsclient.exc import EndpointException
23 # from dcmanagerclient.api.client import client as get_dc_client
24 from fmclient.client import get_client as get_fm_client
25 from fmclient.common.exceptions import HTTPNotFound
27 from o2common.service.client.base_client import BaseClient
28 from o2common.config import config
29 from o2ims.domain import alarm_obj as alarmModel
30 from o2ims.domain.resource_type import ResourceTypeEnum
31 from o2app.adapter import unit_of_work
33 from o2common.helper import o2logging
34 logger = o2logging.get_logger(__name__)
37 CGTSCLIENT_ENDPOINT_ERROR_MSG = \
38 'Must provide Keystone credentials or user-defined endpoint and token'
41 class StxAlarmClient(BaseClient):
42 def __init__(self, uow: unit_of_work.AbstractUnitOfWork, driver=None):
44 self.driver = driver if driver else StxFaultClientImp()
47 def _get(self, id) -> alarmModel.FaultGenericModel:
48 return self.driver.getAlarmInfo(id)
50 def _list(self, **filters) -> List[alarmModel.FaultGenericModel]:
51 # filters['resourcetypeid']
52 newmodels = self.driver.getAlarmList(**filters)
56 rs = uow.session.execute(
58 SELECT "alarmEventRecordId"
59 FROM "alarmEventRecord"
60 WHERE "perceivedSeverity" != :perceived_severity_enum
62 dict(perceived_severity_enum=alarmModel.PerceivedSeverityEnum.
67 # logger.debug('Exist alarm: ' + id)
68 exist_alarms[id] = False
73 if exist_alarms[m.id]:
75 exist_alarms[m.id] = True
77 logger.debug('alarm new: ' + m.id)
80 for alarm in exist_alarms:
81 logger.debug('exist alarm: ' + alarm)
82 if exist_alarms[alarm]:
83 # exist alarm is active
85 event = self._get(alarm)
90 def _set_stx_client(self):
94 class StxEventClient(BaseClient):
95 def __init__(self, driver=None):
97 self.driver = driver if driver else StxFaultClientImp()
99 def _get(self, id) -> alarmModel.FaultGenericModel:
100 return self.driver.getEventInfo(id)
102 def _list(self, **filters) -> List[alarmModel.FaultGenericModel]:
103 return self.driver.getEventList(**filters)
105 def _set_stx_client(self):
109 # internal driver which implement client call to Stx Fault Management instance
110 class StxFaultClientImp(object):
111 def __init__(self, fm_client=None):
113 self.fmclient = fm_client if fm_client else self.getFmClient()
114 # if subcloud_id is not None:
115 # self.stxclient = self.getSubcloudClient(subcloud_id)
117 def getFmClient(self):
118 os_client_args = config.get_fm_access_info()
119 config_client = get_fm_client(1, **os_client_args)
122 def getAlarmList(self, **filters) -> List[alarmModel.FaultGenericModel]:
123 alarms = self.fmclient.alarm.list(expand=True)
126 logger.debug('alarm 1:' + str(alarms[0].to_dict()))
127 # [print('alarm:' + str(alarm.to_dict())) for alarm in alarms if alarm]
128 return [alarmModel.FaultGenericModel(
129 ResourceTypeEnum.PSERVER, self._alarmconverter(alarm))
130 for alarm in alarms if alarm]
132 def getAlarmInfo(self, id) -> alarmModel.FaultGenericModel:
134 alarm = self.fmclient.alarm.get(id)
135 logger.debug('get alarm id ' + id + ':' + str(alarm.to_dict()))
136 # print(alarm.to_dict())
138 event = self.fmclient.event_log.get(id)
139 return alarmModel.FaultGenericModel(
140 ResourceTypeEnum.PSERVER, self._eventconverter(event, True))
141 return alarmModel.FaultGenericModel(
142 ResourceTypeEnum.PSERVER, self._alarmconverter(alarm))
144 def getEventList(self, **filters) -> List[alarmModel.FaultGenericModel]:
145 events = self.fmclient.event_log.list(alarms=True, expand=True)
146 logger.debug('event 1:' + str(events[0].to_dict()))
147 # [print('alarm:' + str(event.to_dict())) for event in events if event]
148 return [alarmModel.FaultGenericModel(
149 ResourceTypeEnum.PSERVER, self._eventconverter(event))
150 for event in events if event]
152 def getEventInfo(self, id) -> alarmModel.FaultGenericModel:
153 event = self.fmclient.event_log.get(id)
154 logger.debug('get event id ' + id + ':' + str(event.to_dict()))
155 # print(event.to_dict())
156 return alarmModel.FaultGenericModel(
157 ResourceTypeEnum.PSERVER, self._eventconverter(event))
160 def _alarmconverter(alarm):
161 # setattr(alarm, 'alarm_def_id', uuid.uuid3(
162 # uuid.NAMESPACE_URL, alarm.alarm_id))
163 setattr(alarm, 'state', alarm.alarm_state)
164 setattr(alarm, 'event_log_type', alarm.alarm_type)
165 setattr(alarm, 'event_log_id', alarm.alarm_id)
167 setattr(alarm, 'alarm_def_id', uuid.uuid3(
168 uuid.NAMESPACE_URL, alarm.alarm_id))
169 setattr(alarm, 'probable_cause_id', uuid.uuid3(
170 uuid.NAMESPACE_URL, alarm.probale_cause))
174 def _eventconverter(event, clear=False):
175 setattr(event, 'alarm_id', event.event_log_id)
176 setattr(event, 'alarm_type', event.event_log_type)
178 logger.debug('alarm is clear')
179 event.state = 'clear'
180 setattr(event, 'alarm_def_id', uuid.uuid3(
181 uuid.NAMESPACE_URL, event.alarm_id))
182 setattr(event, 'probable_cause_id', uuid.uuid3(
183 uuid.NAMESPACE_URL, event.probale_cause))
187 def _alarmeventhasher(event, state=''):
188 # The event model and the alarm model have different parameter name
189 # of the state. alarm model is alarm_state, event model is state.
190 status = event.alarm_state if state == '' else state
191 return str(hash((event.uuid, event.timestamp, status)))