pubsub.subscribe('DmsChanged')
pubsub.subscribe('ResourceChanged')
pubsub.subscribe('AlarmEventChanged')
+ pubsub.subscribe('AlarmEventCleared')
pubsub.subscribe('AlarmEventPurged')
for m in pubsub.listen():
eventtype=data['notificationEventType'],
updatetime=data['updatetime']))
bus.handle(cmd)
+ elif channel == 'AlarmEventCleared':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('AlarmEventCleared with cmd:{}'.format(data))
+ cmd = imscmd.ClearAlarmEvent(data=AlarmEvent2SMO(
+ id=data['id'], ref="", eventtype=data['notificationEventType'],
+ updatetime=data['updatetime']))
+ bus.handle(cmd)
elif channel == 'AlarmEventPurged':
datastr = m['data']
data = json.loads(datastr)
pserver_dev_handler, agg_compute_handler, agg_network_handler,\
agg_storage_handler, agg_undefined_handler
from o2ims.service.command import notify_handler, registration_handler,\
- notify_alarm_handler, purge_alarm_handler
+ notify_alarm_handler, clear_alarm_handler, purge_alarm_handler
from o2ims.service.event import ocloud_event, resource_event, \
resource_pool_event, alarm_event, dms_event, resource_type_event
notify_resourcepool_change],
events.AlarmEventChanged: [alarm_event.\
notify_alarm_event_change],
+ events.AlarmEventCleared: [alarm_event.\
+ notify_alarm_event_clear],
events.AlarmEventPurged: [alarm_event.\
- notify_alarm_event_purge],
+ notify_alarm_event_purge],
} # type: Dict[Type[events.Event], Callable]
commands.PubMessage2SMO: notify_handler.notify_change_to_smo,
commands.PubAlarm2SMO: notify_alarm_handler.notify_alarm_to_smo,
commands.Register2SMO: registration_handler.registry_to_smo,
+ commands.ClearAlarmEvent: clear_alarm_handler.clear_alarm_event,
commands.PurgeAlarmEvent: purge_alarm_handler.purge_alarm_event,
} # type: Dict[Type[commands.Command], Callable]
def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
self.session.merge(alarm_event_record)
- def _delete(self, alarm_event_record_id):
- self.session.query(alarm_obj.AlarmEventRecord).filter_by(
- alarmEventRecordId=alarm_event_record_id).delete()
+ def _delete(self, alarm_event_record):
+ self.session.delete(alarm_event_record)
class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
def deleteAlarm(self, id) -> alarmModel.FaultGenericModel:
alarm = self.fmclient.alarm.delete(id)
- logger.debug('delete alarm id ' + id + ':' + str(alarm.to_dict()))
+ logger.debug('delete alarm id: ' + id)
return alarmModel.FaultGenericModel(
alarmModel.EventTypeEnum.ALARM, self._alarmconverter(alarm))
Column("perceivedSeverity", Integer),
Column("alarmRaisedTime", String(255)),
Column("alarmChangedTime", String(255)),
+ Column("alarmClearedTime", String(255)),
Column("alarmAcknowledgeTime", String(255)),
Column("alarmAcknowledged", String(255)),
Column("extensions", String())
self.perceivedSeverity = perc_severity
self.alarmRaisedTime = raised_time
self.alarmChangedTime = ''
+ self.alarmClearedTime = ''
self.alarmAcknowledgeTime = ''
self.alarmAcknowledged = False
self.extensions = ''
self.perceivedSeverity = alarm.perceivedSeverity
self.alarmRaisedTime = alarm.alarmRaisedTime
self.alarmChangedTime = alarm.alarmChangedTime
+ self.alarmClearedTime = alarm.alarmClearedTime
self.alarmAcknowledgeTime = alarm.alarmAcknowledgeTime
self.alarmAcknowledged = alarm.alarmAcknowledged
self.extensions = alarm.extensions
parentid: str
+@dataclass
+class ClearAlarmEvent(UpdateFaultObject):
+ data: AlarmEvent2SMO
+
+
@dataclass
class PurgeAlarmEvent(UpdateFaultObject):
data: AlarmEvent2SMO
# limitations under the License.
# pylint: disable=too-few-public-methods
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from datetime import datetime
from o2common.domain.events import Event
updatetime: datetime.now()
+@dataclass
+class AlarmEventCleared(Event):
+ id: str
+ notificationEventType: AlarmNotificationEventEnum
+ updatetime: datetime = field(default_factory=datetime.now)
+
+
@dataclass
class AlarmEventPurged(Event):
id: str
notificationEventType: AlarmNotificationEventEnum
- updatetime: datetime.now()
+ updatetime: datetime = field(default_factory=datetime.now)
--- /dev/null
+# Copyright (C) 2024 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+import json
+
+from fmclient.common.exceptions import HTTPNotFound
+from o2common.adapter.notifications import AbstractNotifications
+from o2common.config import conf
+from o2common.domain.filter import gen_orm_filter
+from o2common.helper import o2logging
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.adapter.clients.fault_client import StxAlarmClient
+from o2ims.domain import commands, alarm_obj
+logger = o2logging.get_logger(__name__)
+
+
+def clear_alarm_event(
+ cmd: commands.PubAlarm2SMO,
+ uow: AbstractUnitOfWork,
+ notifications: AbstractNotifications,
+):
+ """
+ Clears an alarm event and notifies relevant subscribers.
+
+ This method performs the following steps:
+ 1. Retrieves data from the command object and initializes the fault client.
+ 2. Uses the Unit of Work pattern to find and delete the corresponding
+ alarm event record.
+ 3. Updates the alarm event record's hash, extensions, changed time,
+ and perceived severity.
+ 4. Commits the changes to the database.
+ 5. Finds and processes all alarm subscriptions, deciding whether to
+ send notifications based on subscription filters.
+
+ Parameters:
+ - cmd (commands.PubAlarm2SMO): Command object containing the alarm
+ event data.
+ - uow (AbstractUnitOfWork): Unit of Work object for managing
+ database transactions.
+ - notifications (AbstractNotifications): Abstract notifications
+ object for sending notifications.
+
+ Exceptions:
+ - Any exceptions that might occur during database operations or
+ notification sending.
+ """
+ fault_client = StxAlarmClient(uow)
+ data = cmd.data
+ with uow:
+ alarm_event_record = uow.alarm_event_records.get(data.id)
+ try:
+ alarm = fault_client.delete(alarm_event_record.alarmEventRecordId)
+ except HTTPNotFound:
+ logger.info(
+ f'Alarm {alarm_event_record.alarmEventRecordId} '
+ 'already deleted from fault management system'
+ )
+ except Exception as e:
+ logger.warning(
+ f'Failed to delete alarm '
+ f'{alarm_event_record.alarmEventRecordId} '
+ f'from fault management system: {str(e)}. '
+ 'Continuing with database purge.'
+ )
+ alarm_event_record.alarmClearedTime = datetime.now().\
+ strftime("%Y-%m-%dT%H:%M:%S")
+ alarm_event_record.perceivedSeverity = \
+ alarm_obj.PerceivedSeverityEnum.CLEARED
+
+ uow.alarm_event_records.update(alarm_event_record)
+
+ uow.commit()
+
+ alarm = uow.alarm_event_records.get(data.id)
+ subs = uow.alarm_subscriptions.list()
+ for sub in subs:
+ sub_data = sub.serialize()
+ logger.debug('Alarm Subscription: {}'.format(
+ sub_data['alarmSubscriptionId']))
+
+ if not sub_data.get('filter', None):
+ callback_smo(notifications, sub, data, alarm)
+ continue
+ try:
+ args = gen_orm_filter(alarm_obj.AlarmEventRecord,
+ sub_data['filter'])
+ except KeyError:
+ logger.warning(
+ 'Alarm Subscription {} filter {} has wrong attribute '
+ 'name or value. Ignore the filter'.format(
+ sub_data['alarmSubscriptionId'],
+ sub_data['filter']))
+ callback_smo(notifications, sub, data, alarm)
+ continue
+ args.append(alarm_obj.AlarmEventRecord.
+ alarmEventRecordId == data.id)
+ count, _ = uow.alarm_event_records.list_with_count(*args)
+ if count != 0:
+ logger.debug(
+ 'Alarm Event {} skip for subscription {} because of '
+ 'the filter.'
+ .format(data.id, sub_data['alarmSubscriptionId']))
+ continue
+ callback_smo(notifications, sub, data, alarm)
+
+
+def callback_smo(notifications: AbstractNotifications,
+ sub: alarm_obj.AlarmSubscription,
+ msg: alarm_obj.AlarmEvent2SMO,
+ alarm: alarm_obj.AlarmEventRecord):
+ sub_data = sub.serialize()
+ alarm_data = alarm.serialize()
+ callback = {
+ 'globalCloudID': conf.DEFAULT.ocloud_global_id,
+ 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
+ 'notificationEventType': msg.notificationEventType,
+ 'objectRef': msg.objectRef,
+ 'alarmEventRecordId': alarm_data['alarmEventRecordId'],
+ 'resourceTypeID': alarm_data['resourceTypeId'],
+ 'resourceID': alarm_data['resourceId'],
+ 'alarmDefinitionID': alarm_data['alarmDefinitionId'],
+ 'probableCauseID': alarm_data['probableCauseId'],
+ 'alarmRaisedTime': alarm_data['alarmRaisedTime'],
+ 'alarmChangedTime': alarm_data['alarmChangedTime'],
+ 'alarmAcknowledgeTime': alarm_data['alarmAcknowledgeTime'],
+ 'alarmAcknowledged': alarm_data['alarmAcknowledged'],
+ 'perceivedSeverity': alarm_data['perceivedSeverity'],
+ 'extensions': json.loads(alarm_data['extensions'])
+ }
+ logger.info('callback URL: {}'.format(sub_data['callback']))
+ logger.debug('callback data: {}'.format(json.dumps(callback)))
+
+ return notifications.send(sub_data['callback'], callback)
import json
+from fmclient.common.exceptions import HTTPNotFound
from o2common.adapter.notifications import AbstractNotifications
-from o2common.config import conf
-from o2common.domain.filter import gen_orm_filter
from o2common.helper import o2logging
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.adapter.clients.fault_client import StxAlarmClient
notifications: AbstractNotifications,
):
"""
- Purges an alarm event and notifies relevant subscribers.
+ Purge an alarm event.
This method performs the following steps:
1. Retrieves data from the command object and initializes the fault client.
2. Uses the Unit of Work pattern to find and delete the corresponding
alarm event record.
- 3. Updates the alarm event record's hash, extensions, changed time,
- and perceived severity.
- 4. Commits the changes to the database.
- 5. Finds and processes all alarm subscriptions, deciding whether to
- send notifications based on subscription filters.
+ 3. Commits the changes to the database.
Parameters:
- cmd (commands.PubAlarm2SMO): Command object containing the alarm
data = cmd.data
with uow:
alarm_event_record = uow.alarm_event_records.get(data.id)
- alarm = fault_client.delete(alarm_event_record.alarmEventRecordId)
- alarm_event_record.hash = alarm.hash
- alarm_event_record.extensions = json.dumps(alarm.filtered)
- alarm_event_record.alarmChangedTime = alarm.updatetime.\
- strftime("%Y-%m-%dT%H:%M:%S")
- alarm_event_record.perceivedSeverity = \
- alarm_obj.PerceivedSeverityEnum.CLEARED
-
- uow.alarm_event_records.update(alarm_event_record)
-
- uow.commit()
-
- alarm = uow.alarm_event_records.get(data.id)
- subs = uow.alarm_subscriptions.list()
- for sub in subs:
- sub_data = sub.serialize()
- logger.debug('Alarm Subscription: {}'.format(
- sub_data['alarmSubscriptionId']))
-
- if not sub_data.get('filter', None):
- callback_smo(notifications, sub, data, alarm)
- continue
+ if str(alarm_event_record.perceivedSeverity) != \
+ alarm_obj.PerceivedSeverityEnum.CLEARED.value:
try:
- args = gen_orm_filter(alarm_obj.AlarmEventRecord,
- sub_data['filter'])
- except KeyError:
+ fault_client.delete(alarm_event_record.alarmEventRecordId)
+ except HTTPNotFound:
+ logger.info(
+ f'Alarm {alarm_event_record.alarmEventRecordId} '
+ 'already deleted from fault management system'
+ )
+ except Exception as e:
logger.warning(
- 'Alarm Subscription {} filter {} has wrong attribute '
- 'name or value. Ignore the filter'.format(
- sub_data['alarmSubscriptionId'],
- sub_data['filter']))
- callback_smo(notifications, sub, data, alarm)
- continue
- args.append(alarm_obj.AlarmEventRecord.
- alarmEventRecordId == data.id)
- count, _ = uow.alarm_event_records.list_with_count(*args)
- if count != 0:
- logger.debug(
- 'Alarm Event {} skip for subscription {} because of '
- 'the filter.'
- .format(data.id, sub_data['alarmSubscriptionId']))
- continue
- callback_smo(notifications, sub, data, alarm)
-
+ f'Failed to delete alarm '
+ f'{alarm_event_record.alarmEventRecordId} '
+ f'from fault management system: {str(e)}. '
+ 'Continuing with database purge.'
+ )
-def callback_smo(notifications: AbstractNotifications,
- sub: alarm_obj.AlarmSubscription,
- msg: alarm_obj.AlarmEvent2SMO,
- alarm: alarm_obj.AlarmEventRecord):
- sub_data = sub.serialize()
- alarm_data = alarm.serialize()
- callback = {
- 'globalCloudID': conf.DEFAULT.ocloud_global_id,
- 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
- 'notificationEventType': msg.notificationEventType,
- 'objectRef': msg.objectRef,
- 'alarmEventRecordId': alarm_data['alarmEventRecordId'],
- 'resourceTypeID': alarm_data['resourceTypeId'],
- 'resourceID': alarm_data['resourceId'],
- 'alarmDefinitionID': alarm_data['alarmDefinitionId'],
- 'probableCauseID': alarm_data['probableCauseId'],
- 'alarmRaisedTime': alarm_data['alarmRaisedTime'],
- 'alarmChangedTime': alarm_data['alarmChangedTime'],
- 'alarmAcknowledgeTime': alarm_data['alarmAcknowledgeTime'],
- 'alarmAcknowledged': alarm_data['alarmAcknowledged'],
- 'perceivedSeverity': alarm_data['perceivedSeverity'],
- 'extensions': json.loads(alarm_data['extensions'])
- }
- logger.info('callback URL: {}'.format(sub_data['callback']))
- logger.debug('callback data: {}'.format(json.dumps(callback)))
-
- return notifications.send(sub_data['callback'], callback)
+ uow.alarm_event_records.delete(alarm_event_record)
+ uow.commit()
+ logger.debug(f'Successfully purge alarm event record: {data.id}')
event.id))
+def notify_alarm_event_clear(
+ event: events.AlarmEventCleared,
+ publish: Callable,
+):
+ publish("AlarmEventCleared", event)
+ logger.debug("published Alarm Event Cleared: {}".format(
+ event.id))
+
+
def notify_alarm_event_purge(
event: events.AlarmEventPurged,
publish: Callable,
from o2ims.domain import commands
from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.alarm_obj import PerceivedSeverityEnum, \
+ AlarmNotificationEventEnum
+from o2ims.domain import events
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
self._set_respool_client()
resourcepoolid = parent.id
+
+ # Check and delete expired alarms before getting new alarms
+ self._check_and_delete_expired_alarms()
+
newmodels = self._client.list()
return [commands.UpdateAlarm(m, resourcepoolid) for m in newmodels] \
if len(newmodels) > 0 else []
def _set_respool_client(self):
self.poolid = self._tags.pool
self._client.set_pool_driver(self.poolid)
+
+ def _check_and_delete_expired_alarms(self):
+ """Check and delete expired alarms based on retention period.
+ Only delete alarms that are either cleared or acknowledged."""
+ try:
+ with self._bus.uow as uow:
+ # Get retention period from alarm service configuration
+ # This will create default config if not exists
+ alarm_config = uow.alarm_service_config.get()
+ # Convert retention period from days to seconds
+ retention_period = alarm_config.retentionPeriod * 24 * 3600
+
+ # Query expired alarms that are either cleared or acknowledged
+ rs = uow.session.execute(
+ '''
+ SELECT "alarmEventRecordId"
+ FROM "alarmEventRecord"
+ WHERE ("perceivedSeverity" = :perceived_severity_enum
+ OR "alarmAcknowledged" = 'true')
+ AND (EXTRACT(EPOCH FROM NOW()) -
+ EXTRACT(EPOCH FROM TO_TIMESTAMP(
+ "alarmRaisedTime", 'YYYY-MM-DD"T"HH24:MI:SS')))
+ > :retention_period
+ ''',
+ dict(
+ retention_period=retention_period,
+ perceived_severity_enum=PerceivedSeverityEnum.CLEARED
+ )
+ )
+
+ # Process expired alarms
+ for row in rs:
+ alarm_id = row[0]
+ try:
+ logger.debug(
+ f'Processing expired alarm for deletion: '
+ f'{alarm_id}')
+
+ # Add purge event before deletion
+ alarm_event = events.AlarmEventPurged(
+ id=alarm_id,
+ notificationEventType=(
+ AlarmNotificationEventEnum.CLEAR)
+ )
+
+ # Update alarm event record with purge event
+ alarm_record = uow.alarm_event_records.get(alarm_id)
+ if alarm_record:
+ alarm_record.events.append(alarm_event)
+ uow.alarm_event_records.update(alarm_record)
+ uow.commit()
+
+ except Exception as e:
+ logger.error(f'Failed to process expired alarm '
+ f'{alarm_id}: {str(e)}')
+
+ except Exception as e:
+ logger.error(f'Error checking expired alarms: {str(e)}')
if result is not None:
return result
elif clear_action:
- if clear_action != PerceivedSeverityEnum.CLEARED:
+ if clear_action != PerceivedSeverityEnum.CLEARED.value:
raise BadRequestException(
'Only the value "5" for "CLEARED" is permitted of '
'"perceivedSeverity".')
alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId)
if alarm_event_record is None:
return None
- elif alarm_event_record.perceivedSeverity == \
- PerceivedSeverityEnum.CLEARED:
+ elif str(alarm_event_record.perceivedSeverity) == \
+ PerceivedSeverityEnum.CLEARED.value:
raise BadRequestException(
"Alarm Event Record {} has already been marked as CLEARED."
.format(alarmEventRecordId))
- alarm_event_record.events.append(events.AlarmEventPurged(
+ alarm_event_record.events.append(events.AlarmEventCleared(
id=alarm_event_record.alarmEventRecordId,
- notificationEventType=AlarmNotificationEventEnum.CLEAR,
- updatetime=alarm_event_record.alarmAcknowledgeTime))
+ notificationEventType=AlarmNotificationEventEnum.CLEAR))
uow.alarm_event_records.update(alarm_event_record)
uow.commit()