1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from datetime import datetime
18 from o2common.service import unit_of_work, messagebus
19 from o2common.views.view import gen_filter, check_filter
20 from o2common.views.pagination_view import Pagination
21 from o2common.views.route_exception import BadRequestException, \
24 from o2ims.domain import events
25 from o2ims.views.alarm_dto import SubscriptionDTO
26 from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord, \
27 AlarmNotificationEventEnum
29 from o2common.helper import o2logging
30 # from o2common.config import config
31 logger = o2logging.get_logger(__name__)
34 def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
35 pagination = Pagination(**kwargs)
36 query_kwargs = pagination.get_pagination()
37 args = gen_filter(AlarmEventRecord,
38 kwargs['filter']) if 'filter' in kwargs else []
40 li = uow.alarm_event_records.list_with_count(*args, **query_kwargs)
41 return pagination.get_result(li)
44 def alarm_event_record_one(alarmEventRecordId: str,
45 uow: unit_of_work.AbstractUnitOfWork):
47 first = uow.alarm_event_records.get(alarmEventRecordId)
48 return first.serialize() if first is not None else None
51 def alarm_event_record_ack(alarmEventRecordId: str,
52 uow: unit_of_work.AbstractUnitOfWork):
54 alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId)
55 if alarm_event_record is None:
57 elif alarm_event_record.alarmAcknowledged == 'true':
58 raise BadRequestException(
59 "Alarm Event Record {} has already been acknowledged."
60 .format(alarmEventRecordId))
61 alarm_event_record.alarmAcknowledged = True
62 alarm_event_record.alarmAcknowledgeTime = datetime.\
63 now().strftime("%Y-%m-%dT%H:%M:%S")
64 bus = messagebus.MessageBus.get_instance()
65 alarm_event_record.events.append(events.AlarmEventPurged(
66 id=alarm_event_record.alarmEventRecordId,
67 notificationEventType=AlarmNotificationEventEnum.ACKNOWLEDGE,
68 updatetime=alarm_event_record.alarmAcknowledgeTime))
70 uow.alarm_event_records.update(alarm_event_record)
73 result = alarm_event_record.serialize()
78 def _handle_events(bus: messagebus.MessageBus):
80 events = bus.uow.collect_new_events()
86 def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
87 pagination = Pagination(**kwargs)
88 query_kwargs = pagination.get_pagination()
89 args = gen_filter(AlarmSubscription,
90 kwargs['filter']) if 'filter' in kwargs else []
92 li = uow.alarm_subscriptions.list_with_count(*args, **query_kwargs)
93 return pagination.get_result(li)
96 def subscription_one(subscriptionId: str,
97 uow: unit_of_work.AbstractUnitOfWork):
99 first = uow.alarm_subscriptions.get(subscriptionId)
100 return first.serialize() if first is not None else None
103 def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
104 uow: unit_of_work.AbstractUnitOfWork):
105 filter = subscriptionDto.get('filter', '')
106 consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
108 check_filter(AlarmEventRecord, filter)
110 sub_uuid = str(uuid.uuid4())
111 subscription = AlarmSubscription(
112 sub_uuid, subscriptionDto['callback'],
113 consumer_subs_id, filter)
116 args.append(getattr(AlarmSubscription, 'callback')
117 == subscriptionDto['callback'])
118 args.append(getattr(AlarmSubscription, 'filter') == filter)
119 args.append(getattr(AlarmSubscription,
120 'consumerSubscriptionId') == consumer_subs_id)
121 count, _ = uow.alarm_subscriptions.list_with_count(*args)
123 raise BadRequestException("The value of parameters is duplicated")
124 uow.alarm_subscriptions.add(subscription)
126 first = uow.alarm_subscriptions.get(sub_uuid)
127 return first.serialize()
130 def subscription_delete(subscriptionId: str,
131 uow: unit_of_work.AbstractUnitOfWork):
133 first = uow.alarm_subscriptions.get(subscriptionId)
135 raise NotFoundException(
136 "Alarm Subscription {} not found.".format(subscriptionId))
137 uow.alarm_subscriptions.delete(subscriptionId)