+
+ def _count(self, **kwargs):
+ return self.session.query(
+ dms.NfDeploymentDesc).filter_by(**kwargs).count()
+
+
+class NfDeploymentSqlAlchemyRepository(
+ dms_repo.NfDeploymentRepository):
+
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, nfdeployment: dms.NfDeployment):
+ self.session.add(nfdeployment)
+
+ def _get(self, nfdeployment_id) -> dms.NfDeployment:
+ return self.session.query(dms.NfDeployment).filter_by(
+ id=nfdeployment_id).first()
+
+ def _list(self) -> List[dms.NfDeployment]:
+ return self.session.query()
+
+ def _update(self, nfdeployment_id, **kwargs):
+ self.session.query(dms.NfDeployment).filter_by(
+ id=nfdeployment_id).update(**kwargs)
+
+ def _delete(self, nfdeployment_id):
+ self.session.query(dms.NfDeployment).filter_by(
+ id=nfdeployment_id
+ ).delete()
+
+ def _count(self, **kwargs):
+ return self.session.query(
+ dms.NfDeployment).filter_by(**kwargs).count()
+
+
+class NfOCloudVResourceSqlAlchemyRepository(
+ dms_repo.NfOCloudVResourceRepository):
+
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, nfocloudvres: dms.NfOCloudVResource):
+ self.session.add(nfocloudvres)
+
+ def _get(self, nfocloudvres_id) -> dms.NfOCloudVResource:
+ return self.session.query(dms.NfOCloudVResource).filter_by(
+ id=nfocloudvres_id).first()
+
+ def _list(self) -> List[dms.NfOCloudVResource]:
+ return self.session.query()
+
+ def _update(self, nfocloudvres, **kwargs):
+ self.session.query(dms.NfOCloudVResource).filter_by(
+ id=nfocloudvres).update(**kwargs)
+
+ def _delete(self, nfocloudvres):
+ self.session.query(dms.NfOCloudVResource).filter_by(
+ id=nfocloudvres
+ ).delete()