From: Zhang Rong(Jon) Date: Thu, 12 Dec 2024 09:44:25 +0000 (+0800) Subject: Autonomous Alarm Purge based on retention period X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=1f1a2838997ef4dc8778ca957b7e1cff59debea0;p=pti%2Fo2.git Autonomous Alarm Purge based on retention period This commit introduces an autonomous purge mechanism for alarm event records based on the retention period specified in the alarm service configuration. Previously, the "purge event" was used for clearing alarms. To improve clarity, the "clear" and "purge" events have been split to better distinguish their respective purposes. Test Plan: PASS - Successfully purges alarm event records that exceed the retention period. PASS - Ensures that alarm event records not in "clear" or "acknowledged" status remain unaffected. PASS - Verifies that clearing an alarm operates as expected. Change-Id: I2414caa4929daf3efb1ccb9848f84ef69b80318f Signed-off-by: Zhang Rong(Jon) (cherry picked from commit 935ccb8b8220b33dc30449bc6a1e901d12da98cb) --- diff --git a/o2app/entrypoints/redis_eventconsumer.py b/o2app/entrypoints/redis_eventconsumer.py index 8098d04..1677bfc 100644 --- a/o2app/entrypoints/redis_eventconsumer.py +++ b/o2app/entrypoints/redis_eventconsumer.py @@ -51,6 +51,7 @@ def main(): pubsub.subscribe('DmsChanged') pubsub.subscribe('ResourceChanged') pubsub.subscribe('AlarmEventChanged') + pubsub.subscribe('AlarmEventCleared') pubsub.subscribe('AlarmEventPurged') for m in pubsub.listen(): @@ -149,6 +150,14 @@ def handle_changed(m, bus): eventtype=data['notificationEventType'], updatetime=data['updatetime'])) bus.handle(cmd) + elif channel == 'AlarmEventCleared': + datastr = m['data'] + data = json.loads(datastr) + logger.info('AlarmEventCleared with cmd:{}'.format(data)) + cmd = imscmd.ClearAlarmEvent(data=AlarmEvent2SMO( + id=data['id'], ref="", eventtype=data['notificationEventType'], + updatetime=data['updatetime'])) + bus.handle(cmd) elif channel == 'AlarmEventPurged': datastr = m['data'] data = json.loads(datastr) diff --git a/o2app/service/handlers.py b/o2app/service/handlers.py index 4ef0ef8..6dc771a 100644 --- a/o2app/service/handlers.py +++ b/o2app/service/handlers.py @@ -30,7 +30,7 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \ pserver_dev_handler, agg_compute_handler, agg_network_handler,\ agg_storage_handler, agg_undefined_handler from o2ims.service.command import notify_handler, registration_handler,\ - notify_alarm_handler, purge_alarm_handler + notify_alarm_handler, clear_alarm_handler, purge_alarm_handler from o2ims.service.event import ocloud_event, resource_event, \ resource_pool_event, alarm_event, dms_event, resource_type_event @@ -63,8 +63,10 @@ EVENT_HANDLERS = { notify_resourcepool_change], events.AlarmEventChanged: [alarm_event.\ notify_alarm_event_change], + events.AlarmEventCleared: [alarm_event.\ + notify_alarm_event_clear], events.AlarmEventPurged: [alarm_event.\ - notify_alarm_event_purge], + notify_alarm_event_purge], } # type: Dict[Type[events.Event], Callable] @@ -97,5 +99,6 @@ COMMAND_HANDLERS = { commands.PubMessage2SMO: notify_handler.notify_change_to_smo, commands.PubAlarm2SMO: notify_alarm_handler.notify_alarm_to_smo, commands.Register2SMO: registration_handler.registry_to_smo, + commands.ClearAlarmEvent: clear_alarm_handler.clear_alarm_event, commands.PurgeAlarmEvent: purge_alarm_handler.purge_alarm_event, } # type: Dict[Type[commands.Command], Callable] diff --git a/o2ims/adapter/alarm_repository.py b/o2ims/adapter/alarm_repository.py index 8c4514d..76f4e4e 100644 --- a/o2ims/adapter/alarm_repository.py +++ b/o2ims/adapter/alarm_repository.py @@ -50,9 +50,8 @@ class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository): def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord): self.session.merge(alarm_event_record) - def _delete(self, alarm_event_record_id): - self.session.query(alarm_obj.AlarmEventRecord).filter_by( - alarmEventRecordId=alarm_event_record_id).delete() + def _delete(self, alarm_event_record): + self.session.delete(alarm_event_record) class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository): diff --git a/o2ims/adapter/clients/fault_client.py b/o2ims/adapter/clients/fault_client.py index 616a4ca..220c5eb 100644 --- a/o2ims/adapter/clients/fault_client.py +++ b/o2ims/adapter/clients/fault_client.py @@ -224,7 +224,7 @@ class StxFaultClientImp(object): def deleteAlarm(self, id) -> alarmModel.FaultGenericModel: alarm = self.fmclient.alarm.delete(id) - logger.debug('delete alarm id ' + id + ':' + str(alarm.to_dict())) + logger.debug('delete alarm id: ' + id) return alarmModel.FaultGenericModel( alarmModel.EventTypeEnum.ALARM, self._alarmconverter(alarm)) diff --git a/o2ims/adapter/orm.py b/o2ims/adapter/orm.py index d7ec289..0d6df31 100644 --- a/o2ims/adapter/orm.py +++ b/o2ims/adapter/orm.py @@ -214,6 +214,7 @@ alarm_event_record = Table( Column("perceivedSeverity", Integer), Column("alarmRaisedTime", String(255)), Column("alarmChangedTime", String(255)), + Column("alarmClearedTime", String(255)), Column("alarmAcknowledgeTime", String(255)), Column("alarmAcknowledged", String(255)), Column("extensions", String()) diff --git a/o2ims/domain/alarm_obj.py b/o2ims/domain/alarm_obj.py index 3c2bf06..d812ed1 100644 --- a/o2ims/domain/alarm_obj.py +++ b/o2ims/domain/alarm_obj.py @@ -101,6 +101,7 @@ class AlarmEventRecord(AgRoot, Serializer): self.perceivedSeverity = perc_severity self.alarmRaisedTime = raised_time self.alarmChangedTime = '' + self.alarmClearedTime = '' self.alarmAcknowledgeTime = '' self.alarmAcknowledged = False self.extensions = '' @@ -222,6 +223,7 @@ class AlarmEventNotification(AgRoot, Serializer): self.perceivedSeverity = alarm.perceivedSeverity self.alarmRaisedTime = alarm.alarmRaisedTime self.alarmChangedTime = alarm.alarmChangedTime + self.alarmClearedTime = alarm.alarmClearedTime self.alarmAcknowledgeTime = alarm.alarmAcknowledgeTime self.alarmAcknowledged = alarm.alarmAcknowledged self.extensions = alarm.extensions diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index 9cc51d6..f335f5f 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -140,6 +140,11 @@ class UpdateAlarm(UpdateFaultObject): parentid: str +@dataclass +class ClearAlarmEvent(UpdateFaultObject): + data: AlarmEvent2SMO + + @dataclass class PurgeAlarmEvent(UpdateFaultObject): data: AlarmEvent2SMO diff --git a/o2ims/domain/events.py b/o2ims/domain/events.py index 04e8a08..214f651 100644 --- a/o2ims/domain/events.py +++ b/o2ims/domain/events.py @@ -13,7 +13,7 @@ # limitations under the License. # pylint: disable=too-few-public-methods -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from o2common.domain.events import Event @@ -64,8 +64,15 @@ class AlarmEventChanged(Event): updatetime: datetime.now() +@dataclass +class AlarmEventCleared(Event): + id: str + notificationEventType: AlarmNotificationEventEnum + updatetime: datetime = field(default_factory=datetime.now) + + @dataclass class AlarmEventPurged(Event): id: str notificationEventType: AlarmNotificationEventEnum - updatetime: datetime.now() + updatetime: datetime = field(default_factory=datetime.now) diff --git a/o2ims/service/command/clear_alarm_handler.py b/o2ims/service/command/clear_alarm_handler.py new file mode 100644 index 0000000..e3b9744 --- /dev/null +++ b/o2ims/service/command/clear_alarm_handler.py @@ -0,0 +1,145 @@ +# Copyright (C) 2024 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import json + +from fmclient.common.exceptions import HTTPNotFound +from o2common.adapter.notifications import AbstractNotifications +from o2common.config import conf +from o2common.domain.filter import gen_orm_filter +from o2common.helper import o2logging +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.adapter.clients.fault_client import StxAlarmClient +from o2ims.domain import commands, alarm_obj +logger = o2logging.get_logger(__name__) + + +def clear_alarm_event( + cmd: commands.PubAlarm2SMO, + uow: AbstractUnitOfWork, + notifications: AbstractNotifications, +): + """ + Clears an alarm event and notifies relevant subscribers. + + This method performs the following steps: + 1. Retrieves data from the command object and initializes the fault client. + 2. Uses the Unit of Work pattern to find and delete the corresponding + alarm event record. + 3. Updates the alarm event record's hash, extensions, changed time, + and perceived severity. + 4. Commits the changes to the database. + 5. Finds and processes all alarm subscriptions, deciding whether to + send notifications based on subscription filters. + + Parameters: + - cmd (commands.PubAlarm2SMO): Command object containing the alarm + event data. + - uow (AbstractUnitOfWork): Unit of Work object for managing + database transactions. + - notifications (AbstractNotifications): Abstract notifications + object for sending notifications. + + Exceptions: + - Any exceptions that might occur during database operations or + notification sending. + """ + fault_client = StxAlarmClient(uow) + data = cmd.data + with uow: + alarm_event_record = uow.alarm_event_records.get(data.id) + try: + alarm = fault_client.delete(alarm_event_record.alarmEventRecordId) + except HTTPNotFound: + logger.info( + f'Alarm {alarm_event_record.alarmEventRecordId} ' + 'already deleted from fault management system' + ) + except Exception as e: + logger.warning( + f'Failed to delete alarm ' + f'{alarm_event_record.alarmEventRecordId} ' + f'from fault management system: {str(e)}. ' + 'Continuing with database purge.' + ) + alarm_event_record.alarmClearedTime = datetime.now().\ + strftime("%Y-%m-%dT%H:%M:%S") + alarm_event_record.perceivedSeverity = \ + alarm_obj.PerceivedSeverityEnum.CLEARED + + uow.alarm_event_records.update(alarm_event_record) + + uow.commit() + + alarm = uow.alarm_event_records.get(data.id) + subs = uow.alarm_subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Alarm Subscription: {}'.format( + sub_data['alarmSubscriptionId'])) + + if not sub_data.get('filter', None): + callback_smo(notifications, sub, data, alarm) + continue + try: + args = gen_orm_filter(alarm_obj.AlarmEventRecord, + sub_data['filter']) + except KeyError: + logger.warning( + 'Alarm Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter'.format( + sub_data['alarmSubscriptionId'], + sub_data['filter'])) + callback_smo(notifications, sub, data, alarm) + continue + args.append(alarm_obj.AlarmEventRecord. + alarmEventRecordId == data.id) + count, _ = uow.alarm_event_records.list_with_count(*args) + if count != 0: + logger.debug( + 'Alarm Event {} skip for subscription {} because of ' + 'the filter.' + .format(data.id, sub_data['alarmSubscriptionId'])) + continue + callback_smo(notifications, sub, data, alarm) + + +def callback_smo(notifications: AbstractNotifications, + sub: alarm_obj.AlarmSubscription, + msg: alarm_obj.AlarmEvent2SMO, + alarm: alarm_obj.AlarmEventRecord): + sub_data = sub.serialize() + alarm_data = alarm.serialize() + callback = { + 'globalCloudID': conf.DEFAULT.ocloud_global_id, + 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], + 'notificationEventType': msg.notificationEventType, + 'objectRef': msg.objectRef, + 'alarmEventRecordId': alarm_data['alarmEventRecordId'], + 'resourceTypeID': alarm_data['resourceTypeId'], + 'resourceID': alarm_data['resourceId'], + 'alarmDefinitionID': alarm_data['alarmDefinitionId'], + 'probableCauseID': alarm_data['probableCauseId'], + 'alarmRaisedTime': alarm_data['alarmRaisedTime'], + 'alarmChangedTime': alarm_data['alarmChangedTime'], + 'alarmAcknowledgeTime': alarm_data['alarmAcknowledgeTime'], + 'alarmAcknowledged': alarm_data['alarmAcknowledged'], + 'perceivedSeverity': alarm_data['perceivedSeverity'], + 'extensions': json.loads(alarm_data['extensions']) + } + logger.info('callback URL: {}'.format(sub_data['callback'])) + logger.debug('callback data: {}'.format(json.dumps(callback))) + + return notifications.send(sub_data['callback'], callback) diff --git a/o2ims/service/command/purge_alarm_handler.py b/o2ims/service/command/purge_alarm_handler.py index 335488a..0eb4e1e 100644 --- a/o2ims/service/command/purge_alarm_handler.py +++ b/o2ims/service/command/purge_alarm_handler.py @@ -14,9 +14,8 @@ import json +from fmclient.common.exceptions import HTTPNotFound from o2common.adapter.notifications import AbstractNotifications -from o2common.config import conf -from o2common.domain.filter import gen_orm_filter from o2common.helper import o2logging from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.adapter.clients.fault_client import StxAlarmClient @@ -30,17 +29,13 @@ def purge_alarm_event( notifications: AbstractNotifications, ): """ - Purges an alarm event and notifies relevant subscribers. + Purge an alarm event. This method performs the following steps: 1. Retrieves data from the command object and initializes the fault client. 2. Uses the Unit of Work pattern to find and delete the corresponding alarm event record. - 3. Updates the alarm event record's hash, extensions, changed time, - and perceived severity. - 4. Commits the changes to the database. - 5. Finds and processes all alarm subscriptions, deciding whether to - send notifications based on subscription filters. + 3. Commits the changes to the database. Parameters: - cmd (commands.PubAlarm2SMO): Command object containing the alarm @@ -58,75 +53,23 @@ def purge_alarm_event( data = cmd.data with uow: alarm_event_record = uow.alarm_event_records.get(data.id) - alarm = fault_client.delete(alarm_event_record.alarmEventRecordId) - alarm_event_record.hash = alarm.hash - alarm_event_record.extensions = json.dumps(alarm.filtered) - alarm_event_record.alarmChangedTime = alarm.updatetime.\ - strftime("%Y-%m-%dT%H:%M:%S") - alarm_event_record.perceivedSeverity = \ - alarm_obj.PerceivedSeverityEnum.CLEARED - - uow.alarm_event_records.update(alarm_event_record) - - uow.commit() - - alarm = uow.alarm_event_records.get(data.id) - subs = uow.alarm_subscriptions.list() - for sub in subs: - sub_data = sub.serialize() - logger.debug('Alarm Subscription: {}'.format( - sub_data['alarmSubscriptionId'])) - - if not sub_data.get('filter', None): - callback_smo(notifications, sub, data, alarm) - continue + if str(alarm_event_record.perceivedSeverity) != \ + alarm_obj.PerceivedSeverityEnum.CLEARED.value: try: - args = gen_orm_filter(alarm_obj.AlarmEventRecord, - sub_data['filter']) - except KeyError: + fault_client.delete(alarm_event_record.alarmEventRecordId) + except HTTPNotFound: + logger.info( + f'Alarm {alarm_event_record.alarmEventRecordId} ' + 'already deleted from fault management system' + ) + except Exception as e: logger.warning( - 'Alarm Subscription {} filter {} has wrong attribute ' - 'name or value. Ignore the filter'.format( - sub_data['alarmSubscriptionId'], - sub_data['filter'])) - callback_smo(notifications, sub, data, alarm) - continue - args.append(alarm_obj.AlarmEventRecord. - alarmEventRecordId == data.id) - count, _ = uow.alarm_event_records.list_with_count(*args) - if count != 0: - logger.debug( - 'Alarm Event {} skip for subscription {} because of ' - 'the filter.' - .format(data.id, sub_data['alarmSubscriptionId'])) - continue - callback_smo(notifications, sub, data, alarm) - + f'Failed to delete alarm ' + f'{alarm_event_record.alarmEventRecordId} ' + f'from fault management system: {str(e)}. ' + 'Continuing with database purge.' + ) -def callback_smo(notifications: AbstractNotifications, - sub: alarm_obj.AlarmSubscription, - msg: alarm_obj.AlarmEvent2SMO, - alarm: alarm_obj.AlarmEventRecord): - sub_data = sub.serialize() - alarm_data = alarm.serialize() - callback = { - 'globalCloudID': conf.DEFAULT.ocloud_global_id, - 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], - 'notificationEventType': msg.notificationEventType, - 'objectRef': msg.objectRef, - 'alarmEventRecordId': alarm_data['alarmEventRecordId'], - 'resourceTypeID': alarm_data['resourceTypeId'], - 'resourceID': alarm_data['resourceId'], - 'alarmDefinitionID': alarm_data['alarmDefinitionId'], - 'probableCauseID': alarm_data['probableCauseId'], - 'alarmRaisedTime': alarm_data['alarmRaisedTime'], - 'alarmChangedTime': alarm_data['alarmChangedTime'], - 'alarmAcknowledgeTime': alarm_data['alarmAcknowledgeTime'], - 'alarmAcknowledged': alarm_data['alarmAcknowledged'], - 'perceivedSeverity': alarm_data['perceivedSeverity'], - 'extensions': json.loads(alarm_data['extensions']) - } - logger.info('callback URL: {}'.format(sub_data['callback'])) - logger.debug('callback data: {}'.format(json.dumps(callback))) - - return notifications.send(sub_data['callback'], callback) + uow.alarm_event_records.delete(alarm_event_record) + uow.commit() + logger.debug(f'Successfully purge alarm event record: {data.id}') diff --git a/o2ims/service/event/alarm_event.py b/o2ims/service/event/alarm_event.py index 78807c7..18e9301 100644 --- a/o2ims/service/event/alarm_event.py +++ b/o2ims/service/event/alarm_event.py @@ -29,6 +29,15 @@ def notify_alarm_event_change( event.id)) +def notify_alarm_event_clear( + event: events.AlarmEventCleared, + publish: Callable, +): + publish("AlarmEventCleared", event) + logger.debug("published Alarm Event Cleared: {}".format( + event.id)) + + def notify_alarm_event_purge( event: events.AlarmEventPurged, publish: Callable, diff --git a/o2ims/service/watcher/alarm_watcher.py b/o2ims/service/watcher/alarm_watcher.py index b4af164..c445031 100644 --- a/o2ims/service/watcher/alarm_watcher.py +++ b/o2ims/service/watcher/alarm_watcher.py @@ -19,6 +19,9 @@ from o2common.service.client.base_client import BaseClient from o2ims.domain import commands from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.alarm_obj import PerceivedSeverityEnum, \ + AlarmNotificationEventEnum +from o2ims.domain import events from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -40,6 +43,10 @@ class AlarmWatcher(BaseWatcher): self._set_respool_client() resourcepoolid = parent.id + + # Check and delete expired alarms before getting new alarms + self._check_and_delete_expired_alarms() + newmodels = self._client.list() return [commands.UpdateAlarm(m, resourcepoolid) for m in newmodels] \ if len(newmodels) > 0 else [] @@ -47,3 +54,61 @@ class AlarmWatcher(BaseWatcher): def _set_respool_client(self): self.poolid = self._tags.pool self._client.set_pool_driver(self.poolid) + + def _check_and_delete_expired_alarms(self): + """Check and delete expired alarms based on retention period. + Only delete alarms that are either cleared or acknowledged.""" + try: + with self._bus.uow as uow: + # Get retention period from alarm service configuration + # This will create default config if not exists + alarm_config = uow.alarm_service_config.get() + # Convert retention period from days to seconds + retention_period = alarm_config.retentionPeriod * 24 * 3600 + + # Query expired alarms that are either cleared or acknowledged + rs = uow.session.execute( + ''' + SELECT "alarmEventRecordId" + FROM "alarmEventRecord" + WHERE ("perceivedSeverity" = :perceived_severity_enum + OR "alarmAcknowledged" = 'true') + AND (EXTRACT(EPOCH FROM NOW()) - + EXTRACT(EPOCH FROM TO_TIMESTAMP( + "alarmRaisedTime", 'YYYY-MM-DD"T"HH24:MI:SS'))) + > :retention_period + ''', + dict( + retention_period=retention_period, + perceived_severity_enum=PerceivedSeverityEnum.CLEARED + ) + ) + + # Process expired alarms + for row in rs: + alarm_id = row[0] + try: + logger.debug( + f'Processing expired alarm for deletion: ' + f'{alarm_id}') + + # Add purge event before deletion + alarm_event = events.AlarmEventPurged( + id=alarm_id, + notificationEventType=( + AlarmNotificationEventEnum.CLEAR) + ) + + # Update alarm event record with purge event + alarm_record = uow.alarm_event_records.get(alarm_id) + if alarm_record: + alarm_record.events.append(alarm_event) + uow.alarm_event_records.update(alarm_record) + uow.commit() + + except Exception as e: + logger.error(f'Failed to process expired alarm ' + f'{alarm_id}: {str(e)}') + + except Exception as e: + logger.error(f'Error checking expired alarms: {str(e)}') diff --git a/o2ims/views/alarm_route.py b/o2ims/views/alarm_route.py index c1c4129..d7c243d 100644 --- a/o2ims/views/alarm_route.py +++ b/o2ims/views/alarm_route.py @@ -164,7 +164,7 @@ class AlarmGetRouter(Resource): if result is not None: return result elif clear_action: - if clear_action != PerceivedSeverityEnum.CLEARED: + if clear_action != PerceivedSeverityEnum.CLEARED.value: raise BadRequestException( 'Only the value "5" for "CLEARED" is permitted of ' '"perceivedSeverity".') diff --git a/o2ims/views/alarm_view.py b/o2ims/views/alarm_view.py index 486a35a..d5efa53 100644 --- a/o2ims/views/alarm_view.py +++ b/o2ims/views/alarm_view.py @@ -74,15 +74,14 @@ def alarm_event_record_clear(alarmEventRecordId: str, alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId) if alarm_event_record is None: return None - elif alarm_event_record.perceivedSeverity == \ - PerceivedSeverityEnum.CLEARED: + elif str(alarm_event_record.perceivedSeverity) == \ + PerceivedSeverityEnum.CLEARED.value: raise BadRequestException( "Alarm Event Record {} has already been marked as CLEARED." .format(alarmEventRecordId)) - alarm_event_record.events.append(events.AlarmEventPurged( + alarm_event_record.events.append(events.AlarmEventCleared( id=alarm_event_record.alarmEventRecordId, - notificationEventType=AlarmNotificationEventEnum.CLEAR, - updatetime=alarm_event_record.alarmAcknowledgeTime)) + notificationEventType=AlarmNotificationEventEnum.CLEAR)) uow.alarm_event_records.update(alarm_event_record) uow.commit()