X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2dms%2Fadapter%2Fdms_repository.py;h=d0116d038f54c4857fe225189363db9dfeb9f140;hb=refs%2Fheads%2Fmaster;hp=5dd2bb1d339214046ad763d22d3170dcff73e43f;hpb=c3ab2dcb442a0e66b7a621c59c6c7a679c26cee9;p=pti%2Fo2.git diff --git a/o2dms/adapter/dms_repository.py b/o2dms/adapter/dms_repository.py index 5dd2bb1..d0116d0 100644 --- a/o2dms/adapter/dms_repository.py +++ b/o2dms/adapter/dms_repository.py @@ -41,3 +41,65 @@ class NfDeploymentDescSqlAlchemyRepository(dms_repo self.session.query(dms.NfDeploymentDesc).filter_by( id=nfdeployment_desc_id ).delete() + + def _count(self, **kwargs): + return self.session.query( + dms.NfDeploymentDesc).filter_by(**kwargs).count() + + +class NfDeploymentSqlAlchemyRepository( + dms_repo.NfDeploymentRepository): + + def __init__(self, session): + super().__init__() + self.session = session + + def _add(self, nfdeployment: dms.NfDeployment): + self.session.add(nfdeployment) + + def _get(self, nfdeployment_id) -> dms.NfDeployment: + return self.session.query(dms.NfDeployment).filter_by( + id=nfdeployment_id).first() + + def _list(self) -> List[dms.NfDeployment]: + return self.session.query() + + def _update(self, nfdeployment_id, **kwargs): + self.session.query(dms.NfDeployment).filter_by( + id=nfdeployment_id).update(**kwargs) + + def _delete(self, nfdeployment_id): + self.session.query(dms.NfDeployment).filter_by( + id=nfdeployment_id + ).delete() + + def _count(self, **kwargs): + return self.session.query( + dms.NfDeployment).filter_by(**kwargs).count() + + +class NfOCloudVResourceSqlAlchemyRepository( + dms_repo.NfOCloudVResourceRepository): + + def __init__(self, session): + super().__init__() + self.session = session + + def _add(self, nfocloudvres: dms.NfOCloudVResource): + self.session.add(nfocloudvres) + + def _get(self, nfocloudvres_id) -> dms.NfOCloudVResource: + return self.session.query(dms.NfOCloudVResource).filter_by( + id=nfocloudvres_id).first() + + def _list(self) -> List[dms.NfOCloudVResource]: + return self.session.query() + + def _update(self, nfocloudvres, **kwargs): + self.session.query(dms.NfOCloudVResource).filter_by( + id=nfocloudvres).update(**kwargs) + + def _delete(self, nfocloudvres): + self.session.query(dms.NfOCloudVResource).filter_by( + id=nfocloudvres + ).delete()