1. Add swagger support accept query parameters.
2. Add a method that is get_by_name in the resource type repository.
3. Make the list method in the resource repository to support more filters.
Issue-ID: INF-257
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I43c0d2e29550880ebdb089759100b68d78d808bf
return self.session.query(ocloud.ResourceType).filter_by(
resourceTypeId=resource_type_id).first()
+ def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:
+ return self.session.query(ocloud.ResourceType).filter_by(
+ name=resource_type_name).first()
+
def _list(self) -> List[ocloud.ResourceType]:
return self.session.query(ocloud.ResourceType)
return res
return recursive(resource_id)
- def _list(self, resourcepool_id) -> List[ocloud.Resource]:
+ def _list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:
return self.session.query(ocloud.Resource).filter_by(
- resourcePoolId=resourcepool_id)
+ resourcePoolId=resourcepool_id, **kwargs)
def _update(self, resource: ocloud.Resource):
self.session.add(resource)
def set_children(self, children: list):\r
self.children = children\r
\r
+ def set_resource_type_name(self, resource_type_name: str):\r
+ self.resourceTypeName = resource_type_name\r
+\r
def serialize(self):\r
d = Serializer.serialize(self)\r
\r
self.seen.add(resource_type)\r
return resource_type\r
\r
+ def get_by_name(self, resource_type_name) -> ocloud.ResourceType:\r
+ resource_type = self._get_by_name(resource_type_name)\r
+ if resource_type:\r
+ self.seen.add(resource_type)\r
+ return resource_type\r
+\r
def list(self) -> List[ocloud.ResourceType]:\r
return self._list()\r
\r
def _get(self, resource_type_id) -> ocloud.ResourceType:\r
raise NotImplementedError\r
\r
+ @abc.abstractmethod\r
+ def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:\r
+ raise NotImplementedError\r
+\r
@abc.abstractmethod\r
def _update(self, resource_type: ocloud.ResourceType):\r
raise NotImplementedError\r
self.seen.add(resource)\r
return resource\r
\r
- def list(self, resourcepool_id) -> List[ocloud.Resource]:\r
- return self._list(resourcepool_id)\r
+ def list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:\r
+ return self._list(resourcepool_id, **kwargs)\r
\r
def update(self, resource: ocloud.Resource):\r
self._update(resource)\r
def _get(self, resource_id) -> ocloud.Resource:\r
raise NotImplementedError\r
\r
+ @abc.abstractmethod\r
+ def _list(self, resourcepool_id, **kwargs) -> ocloud.Resource:\r
+ raise NotImplementedError\r
+\r
@abc.abstractmethod\r
def _update(self, resource: ocloud.Resource):\r
raise NotImplementedError\r
# limitations under the License.
from flask_restx import Resource
+from flask_restx import reqparse
from o2common.service.messagebus import MessageBus
from o2ims.views import ocloud_view
from o2ims.views.ocloud_dto import OcloudDTO, ResourceTypeDTO,\
ResourcePoolDTO, ResourceDTO, DeploymentManagerDTO, SubscriptionDTO
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
def configure_api_route():
# Set global bus for resource
# ---------- Resources ---------- #
@api_ims_inventory_v1.route("/resourcePools/<resourcePoolID>/resources")
@api_ims_inventory_v1.param('resourcePoolID', 'ID of the resource pool')
+@api_ims_inventory_v1.param('resourceTypeName', 'filter resource type',
+ location='args')
+@api_ims_inventory_v1.param('parentId', 'filter parentId',
+ location='args')
class ResourcesListRouter(Resource):
model = ResourceDTO.resource_list
@api_ims_inventory_v1.marshal_list_with(model)
def get(self, resourcePoolID):
- return ocloud_view.resources(resourcePoolID, bus.uow)
+ parser = reqparse.RequestParser()
+ parser.add_argument('resourceTypeName', location='args')
+ parser.add_argument('parentId', location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.resourceTypeName is not None:
+ kwargs['resourceTypeName'] = args.resourceTypeName
+ if args.parentId is not None:
+ kwargs['parentId'] = args.parentId
+ return ocloud_view.resources(resourcePoolID, bus.uow, **kwargs)
@api_ims_inventory_v1.route(
from o2ims.views.ocloud_dto import SubscriptionDTO\r
from o2ims.domain.subscription_obj import Subscription\r
\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
\r
def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
return first.serialize() if first is not None else None\r
\r
\r
-def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork):\r
+def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork,\r
+ **kwargs):\r
+\r
+ filter_kwargs = {} # filter key should be the same with database name\r
+ if 'resourceTypeName' in kwargs:\r
+ resource_type_name = kwargs['resourceTypeName']\r
+ with uow:\r
+ # res_types = uow.resource_types.list()\r
+ # restype_ids = [\r
+ # restype.resourceTypeId for restype in res_types\r
+ # if resourceTypeName == restype.name]\r
+ # restype_id = '' if len(restype_ids) == 0 else restype_ids[0]\r
+ res_type = uow.resource_types.get_by_name(resource_type_name)\r
+ restype_id = '' if res_type is None else res_type.resourceTypeId\r
+ filter_kwargs['resourceTypeId'] = restype_id\r
+\r
+ # li = uow.resources.list(resourcePoolId)\r
+ # return [r.serialize() for r in li if r.resourceTypeId == restype_id]\r
+ if 'parentId' in kwargs:\r
+ filter_kwargs['parentId'] = kwargs['parentId']\r
+\r
with uow:\r
- li = uow.resources.list(resourcePoolId)\r
+ li = uow.resources.list(resourcePoolId, **filter_kwargs)\r
return [r.serialize() for r in li]\r
\r
\r