# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import List
+from typing import List, Tuple
from o2ims.domain import ocloud, subscription_obj
from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
return self.session.query(ocloud.ResourceType).filter_by(
name=resource_type_name).first()
- def _list(self) -> List[ocloud.ResourceType]:
- return self.session.query(ocloud.ResourceType)
+ def _list(self, *args, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.ResourceType).filter(
+ *args).order_by('resourceTypeId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, resourceType: ocloud.ResourceType):
self.session.add(resourceType)
return self.session.query(ocloud.ResourcePool).filter_by(
resourcePoolId=resource_pool_id).first()
- def _list(self) -> List[ocloud.ResourcePool]:
- return self.session.query(ocloud.ResourcePool)
+ def _list(self, *args, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.ResourcePool).filter(
+ *args).order_by('resourcePoolId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, resourcePool: ocloud.ResourcePool):
self.session.add(resourcePool)
return res
return recursive(resource_id)
- def _list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:
- return self.session.query(ocloud.Resource).filter_by(
- resourcePoolId=resourcepool_id, **kwargs)
+ def _list(self, resourcepool_id, *args, **kwargs) -> \
+ Tuple[int, List[ocloud.Resource]]:
+ if 'sort' in kwargs:
+ kwargs.pop('sort')
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.Resource).filter(
+ *args).order_by('resourceId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, resource: ocloud.Resource):
self.session.add(resource)
return self.session.query(ocloud.DeploymentManager).filter_by(
deploymentManagerId=deployment_manager_id).first()
- def _list(self) -> List[ocloud.DeploymentManager]:
- return self.session.query(ocloud.DeploymentManager)
+ def _list(self, *args, **kwargs) -> Tuple[int,
+ List[ocloud.DeploymentManager]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(ocloud.DeploymentManager).filter(
+ *args).order_by('deploymentManagerId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, deployment_manager: ocloud.DeploymentManager):
self.session.add(deployment_manager)
return self.session.query(subscription_obj.Subscription).filter_by(
subscriptionId=subscription_id).first()
- def _list(self) -> List[subscription_obj.Subscription]:
- return self.session.query(subscription_obj.Subscription)
+ def _list(self, *args, **kwargs) -> \
+ Tuple[int, List[subscription_obj.Subscription]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(subscription_obj.Subscription).filter(
+ *args).order_by('subscriptionId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
def _update(self, subscription: subscription_obj.Subscription):
self.session.add(subscription)