X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2dms%2Fdomain%2Fdms_repo.py;fp=o2dms%2Fdomain%2Fdms_repo.py;h=c53d90f7ae46df01ccf99242e66b95f1faadc99c;hb=7fb3d02a3744ecb368b0531f16175034b987d42f;hp=2564a819781ea3167c8e62a1e68aed31c9e41e8d;hpb=c3ab2dcb442a0e66b7a621c59c6c7a679c26cee9;p=pti%2Fo2.git diff --git a/o2dms/domain/dms_repo.py b/o2dms/domain/dms_repo.py index 2564a81..c53d90f 100644 --- a/o2dms/domain/dms_repo.py +++ b/o2dms/domain/dms_repo.py @@ -17,6 +17,46 @@ from typing import List, Set from o2dms.domain import dms +class NfDeploymentRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[dms.NfDeployment] + + def add(self, nfdeployment: dms.NfDeployment): + self._add(nfdeployment) + self.seen.add(nfdeployment) + + def get(self, nfdeployment_id) -> dms.NfDeployment: + nfdeployment = self._get(nfdeployment_id) + if nfdeployment: + self.seen.add(nfdeployment) + return nfdeployment + + def list(self) -> List[dms.NfDeployment]: + return self._list() + + def update(self, id, **kwargs): + self._update(id, **kwargs) + + def delete(self, nfdeployment_id): + self._delete(nfdeployment_id) + + @abc.abstractmethod + def _add(self, nfdeployment: dms.NfDeployment): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, nfdeployment_id) -> dms.NfDeployment: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, id, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def _delete(self, nfdeployment_id): + raise NotImplementedError + + class NfDeploymentDescRepository(abc.ABC): def __init__(self): self.seen = set() # type: Set[dms.NfDeploymentDesc] @@ -49,9 +89,49 @@ class NfDeploymentDescRepository(abc.ABC): raise NotImplementedError @abc.abstractmethod - def _update(self, nfdeployment_descriptor: dms.NfDeploymentDesc): + def _update(self, id, **kwargs): raise NotImplementedError @abc.abstractmethod def _delete(self, nfdeployment_descriptor_id): raise NotImplementedError + + +class NfOCloudVResourceRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[dms.NfOCloudVResource] + + def add(self, nfocloudvres: dms.NfOCloudVResource): + self._add(nfocloudvres) + self.seen.add(nfocloudvres) + + def get(self, nfocloudvres_id) -> dms.NfOCloudVResource: + nfocloudvres = self._get(nfocloudvres_id) + if nfocloudvres: + self.seen.add(nfocloudvres) + return nfocloudvres + + def list(self) -> List[dms.NfOCloudVResource]: + return self._list() + + def update(self, nfocloudvres_id, **kwargs): + self._update(nfocloudvres_id, **kwargs) + + def delete(self, nfocloudvres_id): + self._delete(nfocloudvres_id) + + @abc.abstractmethod + def _add(self, nfocloudvres: dms.NfOCloudVResource): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, nfocloudvres_id) -> dms.NfOCloudVResource: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, nfocloudvres_id, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def _delete(self, nfocloudvres_id): + raise NotImplementedError