# limitations under the License.
import abc
-from typing import List, Set
+from typing import List, Set, Tuple
from o2ims.domain import ocloud
self.seen.add(ocloud)
return ocloud
- def list(self) -> List[ocloud.Ocloud]:
- return self._list()
+ def list(self, *args) -> List[ocloud.Ocloud]:
+ return self._list(*args)
def update(self, ocloud: ocloud.Ocloud):
self._update(ocloud)
def _get(self, ocloud_id) -> ocloud.Ocloud:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, *args) -> List[ocloud.Ocloud]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, ocloud: ocloud.Ocloud):
raise NotImplementedError
self.seen.add(resource_type)
return resource_type
- def list(self) -> List[ocloud.ResourceType]:
- return self._list()
+ def list(self, *args) -> List[ocloud.ResourceType]:
+ return self._list(*args)[1]
+
+ def list_with_count(self, *args, **kwargs) -> \
+ Tuple[int, List[ocloud.ResourceType]]:
+ return self._list(*args, **kwargs)
def update(self, resource_type: ocloud.ResourceType):
self._update(resource_type)
def _get_by_name(self, resource_type_name) -> ocloud.ResourceType:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourceType]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, resource_type: ocloud.ResourceType):
raise NotImplementedError
self.seen.add(resource_pool)
return resource_pool
- def list(self) -> List[ocloud.ResourcePool]:
- return self._list()
+ def list(self, *args) -> List[ocloud.ResourcePool]:
+ return self._list(*args)[1]
+
+ def list_with_count(self, *args, **kwargs) -> \
+ Tuple[int, List[ocloud.ResourcePool]]:
+ return self._list(*args, **kwargs)
def update(self, resource_pool: ocloud.ResourcePool):
self._update(resource_pool)
def _get(self, resource_pool_id) -> ocloud.ResourcePool:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.ResourcePool]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, resource_pool: ocloud.ResourcePool):
raise NotImplementedError
self.seen.add(resource)
return resource
- def list(self, resourcepool_id, **kwargs) -> List[ocloud.Resource]:
- return self._list(resourcepool_id, **kwargs)
+ def list(self, resourcepool_id, *args) -> List[ocloud.Resource]:
+ return self._list(resourcepool_id, *args)[1]
+
+ def list_with_count(self, resourcepool_id, *args, **kwargs) -> \
+ Tuple[int, List[ocloud.Resource]]:
+ return self._list(resourcepool_id, *args, **kwargs)
def update(self, resource: ocloud.Resource):
self._update(resource)
raise NotImplementedError
@abc.abstractmethod
- def _list(self, resourcepool_id, **kwargs) -> ocloud.Resource:
+ def _list(self, resourcepool_id, **kwargs) -> \
+ Tuple[int, List[ocloud.Resource]]:
raise NotImplementedError
@abc.abstractmethod
self.seen.add(deployment_manager)
return deployment_manager
- def list(self) -> List[ocloud.DeploymentManager]:
- return self._list()
+ def list(self, *args) -> List[ocloud.DeploymentManager]:
+ return self._list(*args)[1]
+
+ def list_with_count(self, *args, **kwargs) -> \
+ Tuple[int, List[ocloud.DeploymentManager]]:
+ return self._list(*args, **kwargs)
def update(self, deployment_manager: ocloud.DeploymentManager):
self._update(deployment_manager)
def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
raise NotImplementedError
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[ocloud.DeploymentManager]]:
+ raise NotImplementedError
+
@abc.abstractmethod
def _update(self, deployment_manager: ocloud.DeploymentManager):
raise NotImplementedError