1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # client talking to Stx standalone
17 from typing import List # Optional, Set
20 # from dcmanagerclient.api import client
21 # from cgtsclient.client import get_client as get_stx_client
22 # from cgtsclient.exc import EndpointException
23 # from dcmanagerclient.api.client import client as get_dc_client
24 from fmclient.client import get_client as get_fm_client
25 from fmclient.common.exceptions import HTTPNotFound
27 from o2common.service.client.base_client import BaseClient
28 from o2common.config import config
29 from o2ims.domain import alarm_obj as alarmModel
30 from o2app.adapter import unit_of_work
32 from o2common.helper import o2logging
33 logger = o2logging.get_logger(__name__)
36 CGTSCLIENT_ENDPOINT_ERROR_MSG = \
37 'Must provide Keystone credentials or user-defined endpoint and token'
40 class StxAlarmClient(BaseClient):
41 def __init__(self, uow: unit_of_work.AbstractUnitOfWork, driver=None):
43 self.driver = driver if driver else StxFaultClientImp()
46 def _get(self, id) -> alarmModel.FaultGenericModel:
47 return self.driver.getAlarmInfo(id)
49 def _list(self, **filters) -> List[alarmModel.FaultGenericModel]:
50 # filters['resourcetypeid']
51 newmodels = self.driver.getAlarmList(**filters)
55 rs = uow.session.execute(
57 SELECT "alarmEventRecordId"
58 FROM "alarmEventRecord"
59 WHERE "perceivedSeverity" != :perceived_severity_enum
61 dict(perceived_severity_enum=alarmModel.PerceivedSeverityEnum.
66 # logger.debug('Exist alarm: ' + id)
67 exist_alarms[id] = False
72 if exist_alarms[m.id]:
74 exist_alarms[m.id] = True
76 logger.debug('alarm new: ' + m.id)
79 for alarm in exist_alarms:
80 logger.debug('exist alarm: ' + alarm)
81 if exist_alarms[alarm]:
82 # exist alarm is active
84 event = self._get(alarm)
89 def _set_stx_client(self):
93 class StxEventClient(BaseClient):
94 def __init__(self, driver=None):
96 self.driver = driver if driver else StxFaultClientImp()
98 def _get(self, id) -> alarmModel.FaultGenericModel:
99 return self.driver.getEventInfo(id)
101 def _list(self, **filters) -> List[alarmModel.FaultGenericModel]:
102 return self.driver.getEventList(**filters)
104 def _set_stx_client(self):
108 # internal driver which implement client call to Stx Fault Management instance
109 class StxFaultClientImp(object):
110 def __init__(self, fm_client=None):
112 self.fmclient = fm_client if fm_client else self.getFmClient()
113 # if subcloud_id is not None:
114 # self.stxclient = self.getSubcloudClient(subcloud_id)
116 def getFmClient(self):
117 os_client_args = config.get_fm_access_info()
118 config_client = get_fm_client(1, **os_client_args)
121 def getAlarmList(self, **filters) -> List[alarmModel.FaultGenericModel]:
122 alarms = self.fmclient.alarm.list(expand=True)
125 logger.debug('alarm 1:' + str(alarms[0].to_dict()))
126 # [print('alarm:' + str(alarm.to_dict())) for alarm in alarms if alarm]
127 return [alarmModel.FaultGenericModel(
128 alarmModel.EventTypeEnum.ALARM, self._alarmconverter(alarm))
129 for alarm in alarms if alarm]
131 def getAlarmInfo(self, id) -> alarmModel.FaultGenericModel:
133 alarm = self.fmclient.alarm.get(id)
134 logger.debug('get alarm id ' + id + ':' + str(alarm.to_dict()))
135 # print(alarm.to_dict())
137 event = self.fmclient.event_log.get(id)
138 return alarmModel.FaultGenericModel(
139 alarmModel.EventTypeEnum.ALARM, self._eventconverter(event,
141 return alarmModel.FaultGenericModel(
142 alarmModel.EventTypeEnum.ALARM, self._alarmconverter(alarm))
144 def getEventList(self, **filters) -> List[alarmModel.FaultGenericModel]:
145 events = self.fmclient.event_log.list(alarms=True, expand=True)
146 logger.debug('event 1:' + str(events[0].to_dict()))
147 # [print('alarm:' + str(event.to_dict())) for event in events if event]
148 return [alarmModel.FaultGenericModel(
149 alarmModel.EventTypeEnum.EVENT, self._eventconverter(event))
150 for event in events if event]
152 def getEventInfo(self, id) -> alarmModel.FaultGenericModel:
153 event = self.fmclient.event_log.get(id)
154 logger.debug('get event id ' + id + ':' + str(event.to_dict()))
155 # print(event.to_dict())
156 return alarmModel.FaultGenericModel(
157 alarmModel.EventTypeEnum.EVENT, self._eventconverter(event))
160 def _alarmconverter(alarm):
161 # setattr(alarm, 'alarm_def_id', uuid.uuid3(
162 # uuid.NAMESPACE_URL, alarm.alarm_id))
163 setattr(alarm, 'state', alarm.alarm_state)
165 setattr(alarm, 'alarm_def_id', str(uuid.uuid3(
166 uuid.NAMESPACE_URL, alarm.alarm_id)))
167 setattr(alarm, 'probable_cause_id', str(uuid.uuid3(
168 uuid.NAMESPACE_URL, alarm.probable_cause)))
172 def _eventconverter(event, clear=False):
173 setattr(event, 'alarm_id', event.event_log_id)
174 setattr(event, 'alarm_type', event.event_log_type)
176 logger.debug('alarm is clear')
177 event.state = 'clear'
178 setattr(event, 'alarm_def_id', str(uuid.uuid3(
179 uuid.NAMESPACE_URL, event.alarm_id)))
180 setattr(event, 'probable_cause_id', str(uuid.uuid3(
181 uuid.NAMESPACE_URL, event.probable_cause)))
185 def _alarmeventhasher(event, state=''):
186 # The event model and the alarm model have different parameter name
187 # of the state. alarm model is alarm_state, event model is state.
188 status = event.alarm_state if state == '' else state
189 return str(hash((event.uuid, event.timestamp, status)))