import uuid as uuid
from o2common.service import unit_of_work
+from o2common.views.view import gen_filter, check_filter
+from o2common.views.pagination_view import Pagination
+from o2common.views.route_exception import BadRequestException, \
+ NotFoundException
+
from o2ims.views.alarm_dto import SubscriptionDTO
-from o2ims.domain.alarm_obj import AlarmSubscription
+from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord
from o2common.helper import o2logging
# from o2common.config import config
logger = o2logging.get_logger(__name__)
-def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork):
+def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ query_kwargs = pagination.get_pagination()
+ args = gen_filter(AlarmEventRecord,
+ kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.alarm_event_records.list()
- return [r.serialize() for r in li]
+ li = uow.alarm_event_records.list_with_count(*args, **query_kwargs)
+ return pagination.get_result(li)
def alarm_event_record_one(alarmEventRecordId: str,
return first.serialize() if first is not None else None
-def subscriptions(uow: unit_of_work.AbstractUnitOfWork):
+def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ query_kwargs = pagination.get_pagination()
+ args = gen_filter(AlarmSubscription,
+ kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.alarm_subscriptions.list()
- return [r.serialize() for r in li]
+ li = uow.alarm_subscriptions.list_with_count(*args, **query_kwargs)
+ return pagination.get_result(li)
def subscription_one(subscriptionId: str,
return first.serialize() if first is not None else None
-def subscription_create(subscriptionDto: SubscriptionDTO.subscription,
+def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
uow: unit_of_work.AbstractUnitOfWork):
+ filter = subscriptionDto.get('filter', '')
+ consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+ check_filter(AlarmEventRecord, filter)
sub_uuid = str(uuid.uuid4())
subscription = AlarmSubscription(
sub_uuid, subscriptionDto['callback'],
- subscriptionDto['consumerSubscriptionId'],
- subscriptionDto['filter'])
+ consumer_subs_id, filter)
with uow:
+ args = list()
+ args.append(getattr(AlarmSubscription, 'callback')
+ == subscriptionDto['callback'])
+ args.append(getattr(AlarmSubscription, 'filter') == filter)
+ args.append(getattr(AlarmSubscription,
+ 'consumerSubscriptionId') == consumer_subs_id)
+ count, _ = uow.alarm_subscriptions.list_with_count(*args)
+ if count > 0:
+ raise BadRequestException("The value of parameters is duplicated")
uow.alarm_subscriptions.add(subscription)
uow.commit()
- return {"alarmSubscriptionId": sub_uuid}
+ first = uow.alarm_subscriptions.get(sub_uuid)
+ return first.serialize()
def subscription_delete(subscriptionId: str,
uow: unit_of_work.AbstractUnitOfWork):
with uow:
+ first = uow.alarm_subscriptions.get(subscriptionId)
+ if not first:
+ raise NotFoundException(
+ "Alarm Subscription {} not found.".format(subscriptionId))
uow.alarm_subscriptions.delete(subscriptionId)
uow.commit()
return True