1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from o2common.service import unit_of_work
18 from o2common.views.pagination_view import Pagination
19 from o2ims.views.alarm_dto import SubscriptionDTO
20 from o2ims.domain.alarm_obj import AlarmSubscription
22 from o2common.helper import o2logging
23 # from o2common.config import config
24 logger = o2logging.get_logger(__name__)
27 def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
28 pagination = Pagination(**kwargs)
29 filter_kwargs = pagination.get_filter()
31 li = uow.alarm_event_records.list_with_count(**filter_kwargs)
32 return pagination.get_result(li)
35 def alarm_event_record_one(alarmEventRecordId: str,
36 uow: unit_of_work.AbstractUnitOfWork):
38 first = uow.alarm_event_records.get(alarmEventRecordId)
39 return first.serialize() if first is not None else None
42 def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
43 pagination = Pagination(**kwargs)
44 filter_kwargs = pagination.get_filter()
46 li = uow.alarm_subscriptions.list_with_count(**filter_kwargs)
47 return pagination.get_result(li)
50 def subscription_one(subscriptionId: str,
51 uow: unit_of_work.AbstractUnitOfWork):
53 first = uow.alarm_subscriptions.get(subscriptionId)
54 return first.serialize() if first is not None else None
57 def subscription_create(subscriptionDto: SubscriptionDTO.subscription,
58 uow: unit_of_work.AbstractUnitOfWork):
60 sub_uuid = str(uuid.uuid4())
61 subscription = AlarmSubscription(
62 sub_uuid, subscriptionDto['callback'],
63 subscriptionDto['consumerSubscriptionId'],
64 subscriptionDto['filter'])
66 uow.alarm_subscriptions.add(subscription)
68 return {"alarmSubscriptionId": sub_uuid}
71 def subscription_delete(subscriptionId: str,
72 uow: unit_of_work.AbstractUnitOfWork):
74 uow.alarm_subscriptions.delete(subscriptionId)