1. Add two pagination files as common for pagination.
2. Give a layer of pagination in response.
3. Add 'first', 'prev', 'next', 'last' in the Link Header.
Issue-ID: INF-288
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I3047fb7a4a2b4d6480f706ee7773cbe0b69d405d
def serialize(self):
try:
d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}
- if 'createtime' in d:
- d['createtime'] = d['createtime'].isoformat()
- if 'updatetime' in d:
- d['updatetime'] = d['updatetime'].isoformat()
+ # if 'createtime' in d:
+ # d['createtime'] = d['createtime'].isoformat()
+ # if 'updatetime' in d:
+ # d['updatetime'] = d['updatetime'].isoformat()
return d
# return {c: getattr(self, c) for c in inspect(self).attrs.keys()}
except NoInspectionAvailable:
--- /dev/null
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from urllib.parse import urlparse
+from flask import abort
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+PAGE_PARAM = 'nextpage_opaque_marker'
+
+
+def link_header(full_path: str, ret):
+ base_url = urlparse(full_path)
+ count = ret.pop('count')
+ page_total = ret.pop('page_total')
+ page_current = ret.pop('page_current')
+
+ if page_current > page_total:
+ abort(400, "Page size {} bad request.".format(page_current))
+
+ if 0 == count:
+ return [], {'X-Total-Count': count}
+
+ query = "&".join(["{}".format(q) for q in base_url.query.split(
+ '&') if q.split('=')[0] != PAGE_PARAM])
+ if query != '':
+ query = query + '&'
+ logger.warning(query)
+
+ link_list = []
+ if (page_current > 1):
+ parsed = base_url._replace(query=query + PAGE_PARAM + '=1')
+ link_list.append('<' + parsed.geturl() + '>; rel="first"')
+ if (page_current > 1):
+ parsed = base_url._replace(
+ query=query + PAGE_PARAM + '=' + str(page_current - 1))
+ link_list.append('<' + parsed.geturl() + '>; rel="prev"')
+ if (page_current < page_total):
+ parsed = base_url._replace(
+ query=query + PAGE_PARAM + '=' + str(page_current + 1))
+ link_list.append('<' + parsed.geturl() + '>; rel="next"')
+ if (page_current < page_total):
+ parsed = base_url._replace(
+ query=query + PAGE_PARAM + '=' + str(page_total))
+ link_list.append('<' + parsed.geturl() + '>; rel="last"')
+ if 0 == len(link_list):
+ return ret.pop('results'), {'X-Total-Count': count}
+ link = ','.join(link_list)
+ return ret.pop('results'), {'X-Total-Count': count, 'Link': link}
--- /dev/null
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+from typing import List, Tuple
+
+from o2common.domain.base import Serializer
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class Pagination:
+ def __init__(self, **kwargs) -> None:
+ # filter key should be the same with database name
+ self.filter_kwargs = {}
+ self.limit = int(kwargs['per_page']) if 'per_page' in kwargs else 30
+ self.page = int(kwargs['page']) if 'page' in kwargs else 1
+ if self.page < 1:
+ self.page = 1
+ self.start = (self.page - 1) * self.limit
+ self.filter_kwargs['limit'] = self.limit
+ self.filter_kwargs['start'] = self.start
+
+ def get_filter(self):
+ return self.filter_kwargs
+
+ def get_result(self, ret: Tuple[int, List[Serializer]]):
+ count = ret[0]
+ logger.info('List count: {}'.format(count))
+ ret_list = ret[1]
+ page_total = int(math.ceil(count/self.limit)
+ ) if count > self.limit else 1
+ result = {
+ "count": count,
+ "page_total": page_total,
+ "page_current": self.page,
+ "per_page": self.limit,
+ "results": [r.serialize() for r in ret_list]
+ }
+ return result
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import List
+from typing import List, Tuple
from o2ims.domain import alarm_obj
from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
alarmEventRecordId=alarm_event_record_id).first()
- def _list(self) -> List[alarm_obj.AlarmEventRecord]:
- return self.session.query(alarm_obj.AlarmEventRecord)
+ def _list(self, **kwargs) -> Tuple[int, List[alarm_obj.AlarmEventRecord]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(alarm_obj.AlarmEventRecord).filter_by(
+ **kwargs).order_by('alarmEventRecordId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
self.session.add(alarm_event_record)
return self.session.query(alarm_obj.AlarmSubscription).filter_by(
alarmSubscriptionId=subscription_id).first()
- def _list(self) -> List[alarm_obj.AlarmSubscription]:
- return self.session.query(alarm_obj.AlarmSubscription)
+ def _list(self, **kwargs) -> Tuple[int, List[alarm_obj.AlarmSubscription]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(alarm_obj.AlarmSubscription).filter_by(
+ **kwargs).order_by('alarmSubscriptionId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, subscription: alarm_obj.AlarmSubscription):
self.session.add(subscription)
from o2common.service.client.base_client import BaseClient
from o2common.config import config
from o2ims.domain import alarm_obj as alarmModel
-from o2ims.domain.resource_type import ResourceTypeEnum
from o2app.adapter import unit_of_work
from o2common.helper import o2logging
logger.debug('alarm 1:' + str(alarms[0].to_dict()))
# [print('alarm:' + str(alarm.to_dict())) for alarm in alarms if alarm]
return [alarmModel.FaultGenericModel(
- ResourceTypeEnum.PSERVER, self._alarmconverter(alarm))
+ alarmModel.EventTypeEnum.ALARM, self._alarmconverter(alarm))
for alarm in alarms if alarm]
def getAlarmInfo(self, id) -> alarmModel.FaultGenericModel:
except HTTPNotFound:
event = self.fmclient.event_log.get(id)
return alarmModel.FaultGenericModel(
- ResourceTypeEnum.PSERVER, self._eventconverter(event, True))
+ alarmModel.EventTypeEnum.ALARM, self._eventconverter(event,
+ True))
return alarmModel.FaultGenericModel(
- ResourceTypeEnum.PSERVER, self._alarmconverter(alarm))
+ alarmModel.EventTypeEnum.ALARM, self._alarmconverter(alarm))
def getEventList(self, **filters) -> List[alarmModel.FaultGenericModel]:
events = self.fmclient.event_log.list(alarms=True, expand=True)
logger.debug('event 1:' + str(events[0].to_dict()))
# [print('alarm:' + str(event.to_dict())) for event in events if event]
return [alarmModel.FaultGenericModel(
- ResourceTypeEnum.PSERVER, self._eventconverter(event))
+ alarmModel.EventTypeEnum.EVENT, self._eventconverter(event))
for event in events if event]
def getEventInfo(self, id) -> alarmModel.FaultGenericModel:
logger.debug('get event id ' + id + ':' + str(event.to_dict()))
# print(event.to_dict())
return alarmModel.FaultGenericModel(
- ResourceTypeEnum.PSERVER, self._eventconverter(event))
+ alarmModel.EventTypeEnum.EVENT, self._eventconverter(event))
@ staticmethod
def _alarmconverter(alarm):
# setattr(alarm, 'alarm_def_id', uuid.uuid3(
# uuid.NAMESPACE_URL, alarm.alarm_id))
setattr(alarm, 'state', alarm.alarm_state)
- setattr(alarm, 'event_log_type', alarm.alarm_type)
- setattr(alarm, 'event_log_id', alarm.alarm_id)
- setattr(alarm, 'alarm_def_id', uuid.uuid3(
- uuid.NAMESPACE_URL, alarm.alarm_id))
- setattr(alarm, 'probable_cause_id', uuid.uuid3(
- uuid.NAMESPACE_URL, alarm.probale_cause))
+ setattr(alarm, 'alarm_def_id', str(uuid.uuid3(
+ uuid.NAMESPACE_URL, alarm.alarm_id)))
+ setattr(alarm, 'probable_cause_id', str(uuid.uuid3(
+ uuid.NAMESPACE_URL, alarm.probable_cause)))
return alarm
@ staticmethod
if clear:
logger.debug('alarm is clear')
event.state = 'clear'
- setattr(event, 'alarm_def_id', uuid.uuid3(
- uuid.NAMESPACE_URL, event.alarm_id))
- setattr(event, 'probable_cause_id', uuid.uuid3(
- uuid.NAMESPACE_URL, event.probale_cause))
+ setattr(event, 'alarm_def_id', str(uuid.uuid3(
+ uuid.NAMESPACE_URL, event.alarm_id)))
+ setattr(event, 'probable_cause_id', str(uuid.uuid3(
+ uuid.NAMESPACE_URL, event.probable_cause)))
return event
@ staticmethod
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import List
+from typing import List, Tuple
from o2ims.domain import ocloud, subscription_obj
from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
return self.session.query(ocloud.ResourceType).filter_by(
name=resource_type_name).first()
- def _list(self) -> List[ocloud.ResourceType]:
- return self.session.query(ocloud.ResourceType)
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.ResourceType).filter_by(
+ **kwargs).order_by('resourceTypeId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, resourceType: ocloud.ResourceType):
self.session.add(resourceType)
return self.session.query(ocloud.ResourcePool).filter_by(
resourcePoolId=resource_pool_id).first()
- def _list(self) -> List[ocloud.ResourcePool]:
- return self.session.query(ocloud.ResourcePool)
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.ResourcePool).filter_by(
+ **kwargs).order_by('resourcePoolId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, resourcePool: ocloud.ResourcePool):
self.session.add(resourcePool)
return res
return recursive(resource_id)
- def _list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:
- return self.session.query(ocloud.Resource).filter_by(
- resourcePoolId=resourcepool_id, **kwargs)
+ def _list(self, resourcepool_id, **kwargs) -> \
+ Tuple[int, List[ocloud.Resource]]:
+ if 'sort' in kwargs:
+ kwargs.pop('sort')
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.Resource).filter_by(
+ resourcePoolId=resourcepool_id, **kwargs).order_by('resourceId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, resource: ocloud.Resource):
self.session.add(resource)
return self.session.query(ocloud.DeploymentManager).filter_by(
deploymentManagerId=deployment_manager_id).first()
- def _list(self) -> List[ocloud.DeploymentManager]:
- return self.session.query(ocloud.DeploymentManager)
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.DeploymentManager]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.DeploymentManager).filter_by(
+ **kwargs).order_by('deploymentManagerId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, deployment_manager: ocloud.DeploymentManager):
self.session.add(deployment_manager)
return self.session.query(subscription_obj.Subscription).filter_by(
subscriptionId=subscription_id).first()
- def _list(self) -> List[subscription_obj.Subscription]:
- return self.session.query(subscription_obj.Subscription)
+ def _list(self, **kwargs) -> \
+ Tuple[int, List[subscription_obj.Subscription]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(subscription_obj.Subscription).filter_by(
+ **kwargs).order_by('subscriptionId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, subscription: subscription_obj.Subscription):
self.session.add(subscription)
from o2common.domain.base import AgRoot, Serializer
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
class FaultGenericModel(AgRoot):
def __init__(self, type: str,
if api_response:
self.id = str(api_response.uuid)
self.name = self.id
- self.type = type
+ self.alarm_type = api_response.alarm_type
+ self.alarm_def_name = api_response.alarm_id
+ self.alarm_def_id = api_response.alarm_def_id
+ self.probable_cause_id = api_response.probable_cause_id
self.status = api_response.state
# TODO: time less than second
self.timestamp = datetime.datetime.strptime(
self.hash = content_hash if content_hash \
else str(hash((self.id, self.timestamp, self.status)))
self.content = json.dumps(api_response.to_dict())
+ if EventTypeEnum.ALARM == type:
+ pass
def is_outdated(self, newmodel) -> bool:
# return self.updatetime < newmodel.updatetime
self.content = newmodel.content
+class EventTypeEnum(Enum):
+ ALARM = 'alarm'
+ EVENT = 'event'
+
+
class PerceivedSeverityEnum(str, Enum):
CRITICAL = 0
MAJOR = 1
# limitations under the License.
import abc
-from typing import List, Set
+from typing import List, Set, Tuple
from o2ims.domain import alarm_obj as obj
self.seen.add(alarm_event_record)
return alarm_event_record
- def list(self) -> List[obj.AlarmEventRecord]:
- return self._list()
+ def list(self, **kwargs) -> List[obj.AlarmEventRecord]:
+ return self._list(**kwargs)[1]
+
+ def list_with_count(self, **kwargs) -> \
+ Tuple[int, List[obj.AlarmEventRecord]]:
+ return self._list(**kwargs)
def update(self, alarm_event_record: obj.AlarmEventRecord):
self._update(alarm_event_record)
raise NotImplementedError
@abc.abstractmethod
- def _list(self) -> List[obj.AlarmEventRecord]:
+ def _list(self, **kwargs) -> Tuple[int, List[obj.AlarmEventRecord]]:
raise NotImplementedError
@abc.abstractmethod
self.seen.add(subscription)
return subscription
- def list(self) -> List[obj.AlarmSubscription]:
- return self._list()
+ def list(self, **kwargs) -> List[obj.AlarmSubscription]:
+ return self._list(**kwargs)[1]
+
+ def list_with_count(self, **kwargs) -> \
+ Tuple[int, List[obj.AlarmSubscription]]:
+ return self._list(**kwargs)
def update(self, subscription: obj.AlarmSubscription):
self._update(subscription)
def _get(self, subscription_id) -> obj.AlarmSubscription:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[obj.AlarmSubscription]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, subscription: obj.AlarmSubscription):
raise NotImplementedError
# from typing import List
from o2ims.domain.stx_object import StxGenericModel
-from o2ims.domain.alarm_obj import AlarmEvent2SMO
+from o2ims.domain.alarm_obj import AlarmEvent2SMO, FaultGenericModel
from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
# from o2ims.domain.resource_type import ResourceTypeEnum
from o2common.domain.commands import Command
data: StxGenericModel
+@dataclass
+class UpdateFaultObject(Command):
+ data: FaultGenericModel
+
+
@dataclass
class PubMessage2SMO(Command):
data: Message2SMO
@dataclass
-class UpdateAlarm(UpdateStxObject):
+class UpdateAlarm(UpdateFaultObject):
pass
self.extensions = []
def serialize(self):
- print(self.__dict__)
d = Serializer.serialize(self)
if 'profile' in d and d['profile'] != '':
# limitations under the License.
import abc
-from typing import List, Set
+from typing import List, Set, Tuple
from o2ims.domain import ocloud
self.seen.add(resource_type)
return resource_type
- def list(self) -> List[ocloud.ResourceType]:
- return self._list()
+ def list(self, **kwargs) -> List[ocloud.ResourceType]:
+ return self._list(**kwargs)[1]
+
+ def list_with_count(self, **kwargs) -> \
+ Tuple[int, List[ocloud.ResourceType]]:
+ return self._list(**kwargs)
def update(self, resource_type: ocloud.ResourceType):
self._update(resource_type)
def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, resource_type: ocloud.ResourceType):
raise NotImplementedError
self.seen.add(resource_pool)
return resource_pool
- def list(self) -> List[ocloud.ResourcePool]:
- return self._list()
+ def list(self, **kwargs) -> List[ocloud.ResourcePool]:
+ return self._list(**kwargs)[1]
+
+ def list_with_count(self, **kwargs) -> \
+ Tuple[int, List[ocloud.ResourcePool]]:
+ return self._list(**kwargs)
def update(self, resource_pool: ocloud.ResourcePool):
self._update(resource_pool)
def _get(self, resource_pool_id) -> ocloud.ResourcePool:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, resource_pool: ocloud.ResourcePool):
raise NotImplementedError
return resource
def list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:
+ return self._list(resourcepool_id, **kwargs)[1]
+
+ def list_with_count(self, resourcepool_id, **kwargs) -> \
+ Tuple[int, List[ocloud.Resource]]:
return self._list(resourcepool_id, **kwargs)
def update(self, resource: ocloud.Resource):
raise NotImplementedError
@abc.abstractmethod
- def _list(self, resourcepool_id, **kwargs) -> ocloud.Resource:
+ def _list(self, resourcepool_id, **kwargs) -> \
+ Tuple[int, List[ocloud.Resource]]:
raise NotImplementedError
@abc.abstractmethod
self.seen.add(deployment_manager)
return deployment_manager
- def list(self) -> List[ocloud.DeploymentManager]:
- return self._list()
+ def list(self, **kwargs) -> List[ocloud.DeploymentManager]:
+ return self._list(**kwargs)[1]
+
+ def list_with_count(self, **kwargs) -> \
+ Tuple[int, List[ocloud.DeploymentManager]]:
+ return self._list(**kwargs)
def update(self, deployment_manager: ocloud.DeploymentManager):
self._update(deployment_manager)
def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.DeploymentManager]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, deployment_manager: ocloud.DeploymentManager):
raise NotImplementedError
# limitations under the License.
import abc
-from typing import List, Set
+from typing import List, Set, Tuple
from o2ims.domain import subscription_obj as subobj
self.seen.add(subscription)
return subscription
- def list(self) -> List[subobj.Subscription]:
- return self._list()
+ def list(self, **kwargs) -> List[subobj.Subscription]:
+ return self._list(**kwargs)[1]
+
+ def list_with_count(self, **kwargs) -> \
+ Tuple[int, List[subobj.Subscription]]:
+ return self._list(**kwargs)
def update(self, subscription: subobj.Subscription):
self._update(subscription)
def _update(self, subscription: subobj.Subscription):
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[subobj.Subscription]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _delete(self, subscription_id):
raise NotImplementedError
else:
return alarm_obj.PerceivedSeverityEnum.WARNING
alarm_event_record.perceivedSeverity = severity_switch(content['severity'])
- alarm_event_record.probableCauseId = content['probable_cause_id']
+ alarm_event_record.probableCauseId = fmobj.probable_cause_id
alarm_event_record.hash = fmobj.hash
# logger.info('severity: ' + content['severity'])
# logger.info('perceived severity: '
# See the License for the specific language governing permissions and
# limitations under the License.
-from flask_restx import Resource
+from flask import request
+from flask_restx import Resource, reqparse
from o2common.service.messagebus import MessageBus
+from o2common.views.pagination_route import link_header, PAGE_PARAM
from o2ims.views import alarm_view
from o2ims.views.api_ns import api_monitoring_v1
from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO
# ---------- Alarm Event Record ---------- #
@api_monitoring_v1.route("/alarms")
+@api_monitoring_v1.param(PAGE_PARAM,
+ 'Page number of the results to fetch.' +
+ ' Default: 1',
+ _in='query', default=1)
class AlarmListRouter(Resource):
model = AlarmDTO.alarm_event_record_get
@api_monitoring_v1.marshal_list_with(model)
def get(self):
- return alarm_view.alarm_event_records(bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+
+ ret = alarm_view.alarm_event_records(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
@api_monitoring_v1.route("/alarms/<alarmEventRecordId>")
@api_monitoring_v1.doc('List alarm subscriptions')
@api_monitoring_v1.marshal_list_with(model)
def get(self):
- return alarm_view.subscriptions(bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+
+ ret = alarm_view.subscriptions(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
@api_monitoring_v1.doc('Create a alarm subscription')
@api_monitoring_v1.expect(expect)
import uuid as uuid
from o2common.service import unit_of_work
+from o2common.views.pagination_view import Pagination
from o2ims.views.alarm_dto import SubscriptionDTO
from o2ims.domain.alarm_obj import AlarmSubscription
logger = o2logging.get_logger(__name__)
-def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork):
+def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ filter_kwargs = pagination.get_filter()
with uow:
- li = uow.alarm_event_records.list()
- return [r.serialize() for r in li]
+ li = uow.alarm_event_records.list_with_count(**filter_kwargs)
+ return pagination.get_result(li)
def alarm_event_record_one(alarmEventRecordId: str,
return first.serialize() if first is not None else None
-def subscriptions(uow: unit_of_work.AbstractUnitOfWork):
+def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ filter_kwargs = pagination.get_filter()
with uow:
- li = uow.alarm_subscriptions.list()
- return [r.serialize() for r in li]
+ li = uow.alarm_subscriptions.list_with_count(**filter_kwargs)
+ return pagination.get_result(li)
def subscription_one(subscriptionId: str,
class ResourceDTO:
-
resource_list = api_ims_inventory_v1.model(
"ResourceListDto",
{
}
)
+ list_result = api_ims_inventory_v1.model(
+ "ResourceListPagenationDto",
+ {
+ 'count': fields.Integer(),
+ 'page_num': fields.Integer(),
+ 'results': fields.List(fields.Nested(resource_list))
+ }
+ )
+
+ # def get_paginated_list(results, url, start, limit):
+ # start = int(start)
+ # limit = int(limit)
+ # count = len(results)
+ # if count < start or limit < 0:
+ # api_ims_inventory_v1.abort(404)
+ # # make response
+ # obj = {}
+ # obj['start'] = start
+ # obj['limit'] = limit
+ # obj['count'] = count
+ # # make URLs
+ # # make previous url
+ # if start == 1:
+ # obj['previous'] = ''
+ # else:
+ # start_copy = max(1, start - limit)
+ # limit_copy = start - 1
+ # obj['previous'] = url + \
+ # '?start=%d&limit=%d' % (start_copy, limit_copy)
+ # # make next url
+ # if start + limit > count:
+ # obj['next'] = ''
+ # else:
+ # start_copy = start + limit
+ # obj['next'] = url + '?start=%d&limit=%d' % (start_copy, limit)
+ # # finally extract result according to bounds
+ # # obj['results'] = results[(start - 1):(start - 1 + limit)]
+ # obj['result'] = fields.List(fields.Nested(ResourceDTO.resource_list))
+ # return obj
+
def recursive_resource_mapping(iteration_number=2):
resource_json_mapping = {
'resourceId': fields.String(required=True,
# See the License for the specific language governing permissions and
# limitations under the License.
+from flask import request
from flask_restx import Resource, reqparse
from o2common.service.messagebus import MessageBus
+from o2common.views.pagination_route import link_header, PAGE_PARAM
from o2ims.views import ocloud_view
from o2ims.views.api_ns import api_ims_inventory_v1
from o2ims.views.ocloud_dto import OcloudDTO, ResourceTypeDTO,\
# ---------- ResourceTypes ---------- #
@api_ims_inventory_v1.route("/resourceTypes")
+@api_ims_inventory_v1.param(PAGE_PARAM,
+ 'Page number of the results to fetch.' +
+ ' Default: 1',
+ _in='query', default=1)
class ResourceTypesListRouter(Resource):
model = ResourceTypeDTO.resource_type_get
@api_ims_inventory_v1.marshal_list_with(model)
def get(self):
- return ocloud_view.resource_types(bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+
+ ret = ocloud_view.resource_types(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
@api_ims_inventory_v1.route("/resourceTypes/<resourceTypeID>")
# ---------- ResourcePools ---------- #
@api_ims_inventory_v1.route("/resourcePools")
+@api_ims_inventory_v1.param(PAGE_PARAM,
+ 'Page number of the results to fetch.' +
+ ' Default: 1',
+ _in='query', default=1)
class ResourcePoolsListRouter(Resource):
model = ResourcePoolDTO.resource_pool_get
@api_ims_inventory_v1.marshal_list_with(model)
def get(self):
- return ocloud_view.resource_pools(bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+
+ ret = ocloud_view.resource_pools(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
@api_ims_inventory_v1.route("/resourcePools/<resourcePoolID>")
@api_ims_inventory_v1.route("/resourcePools/<resourcePoolID>/resources")
@api_ims_inventory_v1.param('resourcePoolID', 'ID of the resource pool')
@api_ims_inventory_v1.param('resourceTypeName', 'filter resource type',
- location='args')
+ _in='query')
@api_ims_inventory_v1.param('parentId', 'filter parentId',
- location='args')
+ _in='query')
+# @api_ims_inventory_v1.param('sort', 'sort by column name',
+# _in='query')
+# @api_ims_inventory_v1.param('per_page', 'The number of results per page ' +
+# '(max 100). Default: 30',
+# _in='query', default=30)
+@api_ims_inventory_v1.param(PAGE_PARAM,
+ 'Page number of the results to fetch.' +
+ ' Default: 1',
+ _in='query', default=1)
class ResourcesListRouter(Resource):
model = ResourceDTO.resource_list
parser = reqparse.RequestParser()
parser.add_argument('resourceTypeName', location='args')
parser.add_argument('parentId', location='args')
+ # parser.add_argument('sort', location='args')
+ # parser.add_argument('per_page', location='args')
+ parser.add_argument(PAGE_PARAM, location='args')
args = parser.parse_args()
kwargs = {}
if args.resourceTypeName is not None:
kwargs['parentId'] = args.parentId
if args.parentId.lower() == 'null':
kwargs['parentId'] = None
+ # if args.per_page is not None:
+ # kwargs['per_page'] = args.per_page
+ # base_url = base_url + 'per_page=' + args.per_page + '&'
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
- return ocloud_view.resources(resourcePoolID, bus.uow, **kwargs)
+ ret = ocloud_view.resources(resourcePoolID, bus.uow, **kwargs)
+
+ return link_header(request.full_path, ret)
@api_ims_inventory_v1.route(
# ---------- DeploymentManagers ---------- #
@api_ims_inventory_v1.route("/deploymentManagers")
+@api_ims_inventory_v1.param(PAGE_PARAM,
+ 'Page number of the results to fetch.' +
+ ' Default: 1',
+ _in='query', default=1)
class DeploymentManagersListRouter(Resource):
model = DeploymentManagerDTO.deployment_manager_list
@api_ims_inventory_v1.marshal_list_with(model)
def get(self):
- return ocloud_view.deployment_managers(bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+
+ ret = ocloud_view.deployment_managers(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
@api_ims_inventory_v1.route("/deploymentManagers/<deploymentManagerID>")
@api_ims_inventory_v1.doc('List subscriptions')
@api_ims_inventory_v1.marshal_list_with(model)
def get(self):
- return ocloud_view.subscriptions(bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+
+ ret = ocloud_view.subscriptions(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
@api_ims_inventory_v1.doc('Create a subscription')
@api_ims_inventory_v1.expect(expect)
import shutil
from o2common.service import unit_of_work
+from o2common.config import config
+from o2common.views.pagination_view import Pagination
from o2ims.domain import ocloud
from o2ims.views.ocloud_dto import SubscriptionDTO
from o2ims.domain.subscription_obj import Subscription
from o2common.helper import o2logging
-from o2common.config import config
logger = o2logging.get_logger(__name__)
return first.serialize() if first is not None else None
-def resource_types(uow: unit_of_work.AbstractUnitOfWork):
+def resource_types(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ filter_kwargs = pagination.get_filter()
with uow:
- li = uow.resource_types.list()
- return [r.serialize() for r in li]
+ li = uow.resource_types.list_with_count(**filter_kwargs)
+ return pagination.get_result(li)
def resource_type_one(resourceTypeId: str,
return first.serialize() if first is not None else None
-def resource_pools(uow: unit_of_work.AbstractUnitOfWork):
+def resource_pools(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ filter_kwargs = pagination.get_filter()
with uow:
- li = uow.resource_pools.list()
- return [r.serialize() for r in li]
+ li = uow.resource_pools.list_with_count(**filter_kwargs)
+ return pagination.get_result(li)
def resource_pool_one(resourcePoolId: str,
def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork,
**kwargs):
-
- filter_kwargs = {} # filter key should be the same with database name
+ pagination = Pagination(**kwargs)
+ # filter key should be the same with database name
+ filter_kwargs = pagination.get_filter()
if 'resourceTypeName' in kwargs:
resource_type_name = kwargs['resourceTypeName']
with uow:
restype_id = '' if res_type is None else res_type.resourceTypeId
filter_kwargs['resourceTypeId'] = restype_id
- # li = uow.resources.list(resourcePoolId)
- # return [r.serialize() for r in li if r.resourceTypeId == restype_id]
if 'parentId' in kwargs:
filter_kwargs['parentId'] = kwargs['parentId']
+ if 'sort' in kwargs:
+ filter_kwargs['sort'] = kwargs['sort']
with uow:
- li = uow.resources.list(resourcePoolId, **filter_kwargs)
- return [r.serialize() for r in li]
+ ret = uow.resources.list_with_count(resourcePoolId, **filter_kwargs)
+
+ return pagination.get_result(ret)
def resource_one(resourceId: str, uow: unit_of_work.AbstractUnitOfWork):
return first.serialize() if first is not None else None
-def deployment_managers(uow: unit_of_work.AbstractUnitOfWork):
+def deployment_managers(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ filter_kwargs = pagination.get_filter()
with uow:
- li = uow.deployment_managers.list()
- return [r.serialize() for r in li]
+ li = uow.deployment_managers.list_with_count(**filter_kwargs)
+ return pagination.get_result(li)
def deployment_manager_one(deploymentManagerId: str,
return '/configs/'+kube_config_name
-def subscriptions(uow: unit_of_work.AbstractUnitOfWork):
+def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ pagination = Pagination(**kwargs)
+ filter_kwargs = pagination.get_filter()
with uow:
- li = uow.subscriptions.list()
- return [r.serialize() for r in li]
+ li = uow.subscriptions.list_with_count(**filter_kwargs)
+ return pagination.get_result(li)
def subscription_one(subscriptionId: str,
pytest-cov
pytest-icdiff
mock
+mock-alchemy>=0.1.0,<0.2.0
tenacity
pyOpenSSL
from sqlalchemy.orm import sessionmaker, clear_mappers
from tenacity import retry, stop_after_delay
from unittest.mock import MagicMock
+from mock_alchemy.mocking import UnifiedAlchemyMagicMock
from o2app.bootstrap import bootstrap
from o2ims.views import configure_namespace
return session, uow
+@pytest.fixture
+def mock_alchemy_uow():
+ session = UnifiedAlchemyMagicMock()
+ uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory=session)
+ return session, uow
+
+
@pytest.fixture
def mock_flask_uow(mock_uow):
session, uow = mock_uow
alarm_event_record1 = MagicMock()
alarm_event_record1.serialize.return_value = {
"alarmEventRecordId": alarm_event_record_id1}
- session.return_value.query.return_value = [alarm_event_record1]
- alarm_event_record_list = alarm_view.alarm_event_records(uow)
- assert str(alarm_event_record_list[0].get(
- "alarmEventRecordId")) == alarm_event_record_id1
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [alarm_event_record1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = alarm_view.alarm_event_records(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("alarmEventRecordId")) == alarm_event_record_id1
def test_view_alarm_event_record_one(mock_uow):
"alarmEventRecordId")) == alarm_event_record_id1
+def test_view_alarm_subscriptions(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ sub1 = MagicMock()
+ sub1.serialize.return_value = {
+ "alarmSubscriptionId": subscription_id1,
+ }
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [sub1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = alarm_view.subscriptions(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("alarmSubscriptionId")) == subscription_id1
+
+
+def test_view_alarm_subscription_one(mock_uow):
+ session, uow = mock_uow
+
+ subscription_id1 = str(uuid.uuid4())
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ subscription_res = alarm_view.subscription_one(
+ subscription_id1, uow)
+ assert subscription_res is None
+
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "alarmSubscriptionId": subscription_id1,
+ }
+
+ subscription_res = alarm_view.subscription_one(
+ subscription_id1, uow)
+ assert str(subscription_res.get(
+ "alarmSubscriptionId")) == subscription_id1
+
+
def test_alarm_dictionary(mock_uow):
session, uow = mock_uow
alarm_dict1 = alarm_obj.AlarmDictionary('test1')
def test_flask_get_list(mock_flask_uow):
session, app = mock_flask_uow
- session.query.return_value = []
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
apibase = config.get_o2ims_monitoring_api_base()
with app.test_client() as client:
import uuid
from unittest.mock import MagicMock
-# from o2dms.domain import dms
from o2ims.domain import ocloud, subscription_obj
from o2ims.domain import resource_type as rt
restype1 = MagicMock()
restype1.serialize.return_value = {
"resourceTypeId": resource_type_id1}
- session.return_value.query.return_value = [restype1]
- resource_type_list = ocloud_view.resource_types(uow)
- assert str(resource_type_list[0].get(
- "resourceTypeId")) == resource_type_id1
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [restype1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.resource_types(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("resourceTypeId")) == resource_type_id1
def test_view_resource_type_one(mock_uow):
respool1 = MagicMock()
respool1.serialize.return_value = {
"resourcePoolId": resource_pool_id1}
- session.return_value.query.return_value = [respool1]
- resource_pool_list = ocloud_view.resource_pools(uow)
- assert str(resource_pool_list[0].get(
- "resourcePoolId")) == resource_pool_id1
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [respool1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.resource_pools(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("resourcePoolId")) == resource_pool_id1
def test_view_resource_pool_one(mock_uow):
"resourceId": resource_id1,
"resourcePoolId": resource_pool_id1
}
- session.return_value.query.return_value.filter_by.return_value = [res1]
- resource_list = ocloud_view.resources(resource_pool_id1, uow)
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [res1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.resources(resource_pool_id1, uow)
+ assert result['count'] == 1
+ resource_list = result['results']
assert str(resource_list[0].get("resourceId")) == resource_id1
assert str(resource_list[0].get("resourcePoolId")) == resource_pool_id1
dm1.serialize.return_value = {
"deploymentManagerId": deployment_manager_id1,
}
- session.return_value.query.return_value = [dm1]
- deployment_manager_list = ocloud_view.deployment_managers(uow)
- assert str(deployment_manager_list[0].get(
- "deploymentManagerId")) == deployment_manager_id1
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [dm1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.deployment_managers(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("deploymentManagerId")
+ ) == deployment_manager_id1
def test_view_deployment_manager_one(mock_uow):
sub1.serialize.return_value = {
"subscriptionId": subscription_id1,
}
- session.return_value.query.return_value = [sub1]
- subscription_list = ocloud_view.subscriptions(uow)
- assert str(subscription_list[0].get(
- "subscriptionId")) == subscription_id1
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [sub1]
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
+
+ result = ocloud_view.subscriptions(uow)
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].get("subscriptionId")) == subscription_id1
def test_view_subscription_one(mock_uow):
def test_flask_get_list(mock_flask_uow):
session, app = mock_flask_uow
- session.query.return_value = []
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter_by.return_value.\
+ order_by.return_value = order_by
apibase = config.get_o2ims_api_base()
with app.test_client() as client: