-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2024 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License for the specific language governing permissions and
# limitations under the License.
+from datetime import datetime
import uuid as uuid
-from o2common.service import unit_of_work
+from o2common.service import unit_of_work, messagebus
from o2common.views.view import gen_filter, check_filter
from o2common.views.pagination_view import Pagination
from o2common.views.route_exception import BadRequestException, \
NotFoundException
+from o2ims.domain import events
from o2ims.views.alarm_dto import SubscriptionDTO
-from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord
+from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord, \
+ AlarmNotificationEventEnum
from o2common.helper import o2logging
# from o2common.config import config
return first.serialize() if first is not None else None
+def alarm_event_record_ack(alarmEventRecordId: str,
+ uow: unit_of_work.AbstractUnitOfWork):
+ with uow:
+ alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId)
+ if alarm_event_record is None:
+ return None
+ elif alarm_event_record.alarmAcknowledged == 'true':
+ raise BadRequestException(
+ "Alarm Event Record {} has already been acknowledged."
+ .format(alarmEventRecordId))
+ alarm_event_record.alarmAcknowledged = True
+ alarm_event_record.alarmAcknowledgeTime = datetime.\
+ now().strftime("%Y-%m-%dT%H:%M:%S")
+ bus = messagebus.MessageBus.get_instance()
+ alarm_event_record.events.append(events.AlarmEventPurged(
+ id=alarm_event_record.alarmEventRecordId,
+ notificationEventType=AlarmNotificationEventEnum.ACKNOWLEDGE,
+ updatetime=alarm_event_record.alarmAcknowledgeTime))
+
+ uow.alarm_event_records.update(alarm_event_record)
+ uow.commit()
+
+ result = alarm_event_record.serialize()
+ _handle_events(bus)
+ return result
+
+
+def _handle_events(bus: messagebus.MessageBus):
+ # handle events
+ events = bus.uow.collect_new_events()
+ for event in events:
+ bus.handle(event)
+ return True
+
+
def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
query_kwargs = pagination.get_pagination()