# See the License for the specific language governing permissions and
# limitations under the License.
-import abc
-from typing import List, Set
+from typing import List
# from o2ims.adapter import orm
from o2ims.domain import ocloud
-
-
-class OcloudRepository(abc.ABC):
- def __init__(self):
- self.seen = set() # type: Set[ocloud.Ocloud]
-
- def add(self, ocloud: ocloud.Ocloud):
- self._add(ocloud)
- self.seen.add(ocloud)
-
- def get(self, ocloudid) -> ocloud.Ocloud:
- ocloud = self._get(ocloudid)
- if ocloud:
- self.seen.add(ocloud)
- return ocloud
-
- def list(self) -> List[ocloud.Ocloud]:
- return self._list()
-
- def update(self, ocloud: ocloud.Ocloud):
- self._update(ocloud)
-
- # def update_fields(self, ocloudid: str, updatefields: dict):
- # self._update(ocloudid, updatefields)
-
- @abc.abstractmethod
- def _add(self, ocloud: ocloud.Ocloud):
- raise NotImplementedError
-
- @abc.abstractmethod
- def _get(self, ocloudid) -> ocloud.Ocloud:
- raise NotImplementedError
-
- @abc.abstractmethod
- def _update(self, ocloud: ocloud.Ocloud):
- raise NotImplementedError
+from o2ims.domain.ocloud_repo import OcloudRepository
class OcloudSqlAlchemyRepository(OcloudRepository):
oCloudId=ocloudid).first()
def _list(self) -> List[ocloud.Ocloud]:
- return self.session.query()
+ return self.session.query(ocloud.Ocloud).order_by(
+ ocloud.Ocloud.name).all()
def _update(self, ocloud: ocloud.Ocloud):
self.session.add(ocloud)