-# Copyright (C) 2021-2024 Wind River Systems, Inc.
+# Copyright (C) 2021 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License for the specific language governing permissions and
# limitations under the License.
-from datetime import datetime
import uuid as uuid
-from o2common.service import unit_of_work, messagebus
+from o2common.service import unit_of_work
from o2common.views.view import gen_filter, check_filter
from o2common.views.pagination_view import Pagination
from o2common.views.route_exception import BadRequestException, \
NotFoundException
-from o2ims.domain import events
from o2ims.views.alarm_dto import SubscriptionDTO
-from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord, \
- AlarmNotificationEventEnum
+from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord
from o2common.helper import o2logging
# from o2common.config import config
return first.serialize() if first is not None else None
-def alarm_event_record_ack(alarmEventRecordId: str,
- uow: unit_of_work.AbstractUnitOfWork):
- with uow:
- alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId)
- if alarm_event_record is None:
- return None
- elif alarm_event_record.alarmAcknowledged == 'true':
- raise BadRequestException(
- "Alarm Event Record {} has already been acknowledged."
- .format(alarmEventRecordId))
- alarm_event_record.alarmAcknowledged = True
- alarm_event_record.alarmAcknowledgeTime = datetime.\
- now().strftime("%Y-%m-%dT%H:%M:%S")
- bus = messagebus.MessageBus.get_instance()
- alarm_event_record.events.append(events.AlarmEventPurged(
- id=alarm_event_record.alarmEventRecordId,
- notificationEventType=AlarmNotificationEventEnum.ACKNOWLEDGE,
- updatetime=alarm_event_record.alarmAcknowledgeTime))
-
- uow.alarm_event_records.update(alarm_event_record)
- uow.commit()
-
- result = alarm_event_record.serialize()
- _handle_events(bus)
- return result
-
-
-def _handle_events(bus: messagebus.MessageBus):
- # handle events
- events = bus.uow.collect_new_events()
- for event in events:
- bus.handle(event)
- return True
-
-
def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
query_kwargs = pagination.get_pagination()